Add a method to send "End of Track" to the queue. PiperOrigin-RevId: 742352459
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 72b7ca4..3c8f001 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -372,6 +372,19 @@ success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received >= 7; }); EXPECT_TRUE(success); + + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{2, 2}, _, + MoqtObjectStatus::kEndOfGroup, "", true)) + .WillOnce([&] { ++received; }); + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{3, 0}, _, + MoqtObjectStatus::kEndOfTrack, "", true)) + .WillOnce([&] { ++received; }); + queue->Close(); + success = test_harness_.RunUntilWithDefaultTimeout( + [&]() { return received >= 9; }); + EXPECT_TRUE(success); } }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 2c05a70..d6ba015 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -33,25 +33,33 @@ "flag."; return; } - - if (key) { - if (!queue_.empty()) { - AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice()); - } - - if (queue_.size() == kMaxQueuedGroups) { - queue_.erase(queue_.begin()); - for (MoqtObjectListener* listener : listeners_) { - listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1); - } - } - queue_.emplace_back(); - ++current_group_id_; + if (closed_) { + QUICHE_BUG(MoqtOutgoingQueue_AddObject_closed) + << "Trying to send objects on a closed queue."; + return; } + if (key) { + OpenNewGroup(); + } AddRawObject(MoqtObjectStatus::kNormal, std::move(payload)); } +void MoqtOutgoingQueue::OpenNewGroup() { + if (!queue_.empty()) { + AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice()); + } + + if (queue_.size() == kMaxQueuedGroups) { + queue_.erase(queue_.begin()); + for (MoqtObjectListener* listener : listeners_) { + listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1); + } + } + queue_.emplace_back(); + ++current_group_id_; +} + void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload) { FullSequence sequence{current_group_id_, queue_.back().size()}; @@ -101,6 +109,9 @@ } absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const { + if (closed_) { + return MoqtTrackStatusCode::kFinished; + } if (queue_.empty()) { return MoqtTrackStatusCode::kNotYetBegun; } @@ -193,4 +204,16 @@ return kSuccess; } +void MoqtOutgoingQueue::Close() { + if (closed_) { + QUICHE_BUG(MoqtOutgoingQueue_Close_twice) + << "Trying to close an outgoing queue that is already closed."; + return; + } + closed_ = true; + + OpenNewGroup(); + AddRawObject(MoqtObjectStatus::kEndOfTrack, {}); +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index 9715563..4687ac8 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -95,6 +95,9 @@ } } + // Sends an "End of Track" object. + void Close(); + private: // The number of recent groups to keep around for newly joined subscribers. static constexpr size_t kMaxQueuedGroups = 3; @@ -125,7 +128,10 @@ using Group = std::vector<CachedObject>; + // Appends an object to the end of the current group. void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload); + // Closes the current group, if there is any, and opens a new one. + void OpenNewGroup(); // The number of the oldest group available. uint64_t first_group_in_queue() const { @@ -137,6 +143,7 @@ MoqtForwardingPreference forwarding_preference_; MoqtPriority publisher_priority_ = 128; MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending; + bool closed_ = false; absl::InlinedVector<Group, kMaxQueuedGroups> queue_; uint64_t current_group_id_ = -1; absl::flat_hash_set<MoqtObjectListener*> listeners_;