Add a method to send "End of Track" to the queue.
PiperOrigin-RevId: 742352459
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 72b7ca4..3c8f001 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -372,6 +372,19 @@
success = test_harness_.RunUntilWithDefaultTimeout(
[&]() { return received >= 7; });
EXPECT_TRUE(success);
+
+ EXPECT_CALL(client_visitor,
+ OnObjectFragment(_, FullSequence{2, 2}, _,
+ MoqtObjectStatus::kEndOfGroup, "", true))
+ .WillOnce([&] { ++received; });
+ EXPECT_CALL(client_visitor,
+ OnObjectFragment(_, FullSequence{3, 0}, _,
+ MoqtObjectStatus::kEndOfTrack, "", true))
+ .WillOnce([&] { ++received; });
+ queue->Close();
+ success = test_harness_.RunUntilWithDefaultTimeout(
+ [&]() { return received >= 9; });
+ EXPECT_TRUE(success);
}
}
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 2c05a70..d6ba015 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -33,25 +33,33 @@
"flag.";
return;
}
-
- if (key) {
- if (!queue_.empty()) {
- AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
- }
-
- if (queue_.size() == kMaxQueuedGroups) {
- queue_.erase(queue_.begin());
- for (MoqtObjectListener* listener : listeners_) {
- listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1);
- }
- }
- queue_.emplace_back();
- ++current_group_id_;
+ if (closed_) {
+ QUICHE_BUG(MoqtOutgoingQueue_AddObject_closed)
+ << "Trying to send objects on a closed queue.";
+ return;
}
+ if (key) {
+ OpenNewGroup();
+ }
AddRawObject(MoqtObjectStatus::kNormal, std::move(payload));
}
+void MoqtOutgoingQueue::OpenNewGroup() {
+ if (!queue_.empty()) {
+ AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
+ }
+
+ if (queue_.size() == kMaxQueuedGroups) {
+ queue_.erase(queue_.begin());
+ for (MoqtObjectListener* listener : listeners_) {
+ listener->OnGroupAbandoned(current_group_id_ - kMaxQueuedGroups + 1);
+ }
+ }
+ queue_.emplace_back();
+ ++current_group_id_;
+}
+
void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
quiche::QuicheMemSlice payload) {
FullSequence sequence{current_group_id_, queue_.back().size()};
@@ -101,6 +109,9 @@
}
absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const {
+ if (closed_) {
+ return MoqtTrackStatusCode::kFinished;
+ }
if (queue_.empty()) {
return MoqtTrackStatusCode::kNotYetBegun;
}
@@ -193,4 +204,16 @@
return kSuccess;
}
+void MoqtOutgoingQueue::Close() {
+ if (closed_) {
+ QUICHE_BUG(MoqtOutgoingQueue_Close_twice)
+ << "Trying to close an outgoing queue that is already closed.";
+ return;
+ }
+ closed_ = true;
+
+ OpenNewGroup();
+ AddRawObject(MoqtObjectStatus::kEndOfTrack, {});
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 9715563..4687ac8 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -95,6 +95,9 @@
}
}
+ // Sends an "End of Track" object.
+ void Close();
+
private:
// The number of recent groups to keep around for newly joined subscribers.
static constexpr size_t kMaxQueuedGroups = 3;
@@ -125,7 +128,10 @@
using Group = std::vector<CachedObject>;
+ // Appends an object to the end of the current group.
void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
+ // Closes the current group, if there is any, and opens a new one.
+ void OpenNewGroup();
// The number of the oldest group available.
uint64_t first_group_in_queue() const {
@@ -137,6 +143,7 @@
MoqtForwardingPreference forwarding_preference_;
MoqtPriority publisher_priority_ = 128;
MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending;
+ bool closed_ = false;
absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
uint64_t current_group_id_ = -1;
absl::flat_hash_set<MoqtObjectListener*> listeners_;