Add a method to send "End of Track" to the queue.

PiperOrigin-RevId: 742352459
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 72b7ca4..3c8f001 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -372,6 +372,19 @@
     success = test_harness_.RunUntilWithDefaultTimeout(
         [&]() { return received >= 7; });
     EXPECT_TRUE(success);
+
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, FullSequence{2, 2}, _,
+                                 MoqtObjectStatus::kEndOfGroup, "", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, FullSequence{3, 0}, _,
+                                 MoqtObjectStatus::kEndOfTrack, "", true))
+        .WillOnce([&] { ++received; });
+    queue->Close();
+    success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 9; });
+    EXPECT_TRUE(success);
   }
 }
 
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 2c05a70..d6ba015 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -33,25 +33,33 @@
            "flag.";
     return;
   }
-
-  if (key) {
-    if (!queue_.empty()) {
-      AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
-    }
-
-    if (queue_.size() == kMaxQueuedGroups) {
-      queue_.erase(queue_.begin());
-      for (MoqtObjectListener* listener : listeners_) {
-        listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1);
-      }
-    }
-    queue_.emplace_back();
-    ++current_group_id_;
+  if (closed_) {
+    QUICHE_BUG(MoqtOutgoingQueue_AddObject_closed)
+        << "Trying to send objects on a closed queue.";
+    return;
   }
 
+  if (key) {
+    OpenNewGroup();
+  }
   AddRawObject(MoqtObjectStatus::kNormal, std::move(payload));
 }
 
+void MoqtOutgoingQueue::OpenNewGroup() {
+  if (!queue_.empty()) {
+    AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
+  }
+
+  if (queue_.size() == kMaxQueuedGroups) {
+    queue_.erase(queue_.begin());
+    for (MoqtObjectListener* listener : listeners_) {
+      listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1);
+    }
+  }
+  queue_.emplace_back();
+  ++current_group_id_;
+}
+
 void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
                                      quiche::QuicheMemSlice payload) {
   FullSequence sequence{current_group_id_, queue_.back().size()};
@@ -101,6 +109,9 @@
 }
 
 absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const {
+  if (closed_) {
+    return MoqtTrackStatusCode::kFinished;
+  }
   if (queue_.empty()) {
     return MoqtTrackStatusCode::kNotYetBegun;
   }
@@ -193,4 +204,16 @@
   return kSuccess;
 }
 
+void MoqtOutgoingQueue::Close() {
+  if (closed_) {
+    QUICHE_BUG(MoqtOutgoingQueue_Close_twice)
+        << "Trying to close an outgoing queue that is already closed.";
+    return;
+  }
+  closed_ = true;
+
+  OpenNewGroup();
+  AddRawObject(MoqtObjectStatus::kEndOfTrack, {});
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 9715563..4687ac8 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -95,6 +95,9 @@
     }
   }
 
+  // Sends an "End of Track" object.
+  void Close();
+
  private:
   // The number of recent groups to keep around for newly joined subscribers.
   static constexpr size_t kMaxQueuedGroups = 3;
@@ -125,7 +128,10 @@
 
   using Group = std::vector<CachedObject>;
 
+  // Appends an object to the end of the current group.
   void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
+  // Closes the current group, if there is any, and opens a new one.
+  void OpenNewGroup();
 
   // The number of the oldest group available.
   uint64_t first_group_in_queue() const {
@@ -137,6 +143,7 @@
   MoqtForwardingPreference forwarding_preference_;
   MoqtPriority publisher_priority_ = 128;
   MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending;
+  bool closed_ = false;
   absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
   uint64_t current_group_id_ = -1;
   absl::flat_hash_set<MoqtObjectListener*> listeners_;