Implement MoqtOutgoingQueue.
This lets us have some "backlog buffer" in-memory, primarily for use in ingestion client.
PiperOrigin-RevId: 627368811
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 2e6cd92..465be42 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1494,6 +1494,7 @@
moqt_hdrs = [
"quic/moqt/moqt_framer.h",
"quic/moqt/moqt_messages.h",
+ "quic/moqt/moqt_outgoing_queue.h",
"quic/moqt/moqt_parser.h",
"quic/moqt/moqt_session.h",
"quic/moqt/moqt_subscribe_windows.h",
@@ -1508,6 +1509,8 @@
"quic/moqt/moqt_framer_test.cc",
"quic/moqt/moqt_integration_test.cc",
"quic/moqt/moqt_messages.cc",
+ "quic/moqt/moqt_outgoing_queue.cc",
+ "quic/moqt/moqt_outgoing_queue_test.cc",
"quic/moqt/moqt_parser.cc",
"quic/moqt/moqt_parser_test.cc",
"quic/moqt/moqt_session.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 5105182..1bb50dd 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1498,6 +1498,7 @@
moqt_hdrs = [
"src/quiche/quic/moqt/moqt_framer.h",
"src/quiche/quic/moqt/moqt_messages.h",
+ "src/quiche/quic/moqt/moqt_outgoing_queue.h",
"src/quiche/quic/moqt/moqt_parser.h",
"src/quiche/quic/moqt/moqt_session.h",
"src/quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1512,6 +1513,8 @@
"src/quiche/quic/moqt/moqt_framer_test.cc",
"src/quiche/quic/moqt/moqt_integration_test.cc",
"src/quiche/quic/moqt/moqt_messages.cc",
+ "src/quiche/quic/moqt/moqt_outgoing_queue.cc",
+ "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"src/quiche/quic/moqt/moqt_parser.cc",
"src/quiche/quic/moqt/moqt_parser_test.cc",
"src/quiche/quic/moqt/moqt_session.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 7b749af..fb45e82 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1497,6 +1497,7 @@
"moqt_hdrs": [
"quiche/quic/moqt/moqt_framer.h",
"quiche/quic/moqt/moqt_messages.h",
+ "quiche/quic/moqt/moqt_outgoing_queue.h",
"quiche/quic/moqt/moqt_parser.h",
"quiche/quic/moqt/moqt_session.h",
"quiche/quic/moqt/moqt_subscribe_windows.h",
@@ -1511,6 +1512,8 @@
"quiche/quic/moqt/moqt_framer_test.cc",
"quiche/quic/moqt/moqt_integration_test.cc",
"quiche/quic/moqt/moqt_messages.cc",
+ "quiche/quic/moqt/moqt_outgoing_queue.cc",
+ "quiche/quic/moqt/moqt_outgoing_queue_test.cc",
"quiche/quic/moqt/moqt_parser.cc",
"quiche/quic/moqt/moqt_parser_test.cc",
"quiche/quic/moqt/moqt_session.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 8691be6..56286fc 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -16,10 +16,12 @@
#include "quiche/quic/core/quic_generic_session.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
#include "quiche/quic/test_tools/crypto_test_utils.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
#include "quiche/quic/test_tools/simulator/simulator.h"
#include "quiche/quic/test_tools/simulator/test_harness.h"
#include "quiche/common/platform/api/quiche_test.h"
@@ -29,6 +31,7 @@
namespace {
using ::quic::simulator::Simulator;
+using ::quic::test::MemSliceFromString;
using ::testing::_;
using ::testing::Assign;
using ::testing::Return;
@@ -241,6 +244,48 @@
EXPECT_TRUE(success);
}
+TEST_F(MoqtIntegrationTest, AnnounceSuccessSendDatainResponse) {
+ EstablishSession();
+
+ // Set up the server to subscribe to "data" track for the namespace announce
+ // it receives.
+ MockRemoteTrackVisitor server_visitor;
+ EXPECT_CALL(server_->callbacks().incoming_announce_callback, Call(_))
+ .WillOnce([&](absl::string_view track_namespace) {
+ server_->session()->SubscribeAbsolute(
+ track_namespace, "data", /*start_group=*/0,
+ /*start_object=*/0, &server_visitor);
+ return std::optional<MoqtAnnounceErrorReason>();
+ });
+
+ MoqtOutgoingQueue queue(client_->session(), FullTrackName{"test", "data"});
+ client_->session()->AddLocalTrack(FullTrackName{"test", "data"},
+ MoqtForwardingPreference::kGroup, &queue);
+ queue.AddObject(MemSliceFromString("object data"), /*key=*/true);
+ client_->session()->Announce(
+ "test", [](absl::string_view, std::optional<MoqtAnnounceErrorReason>) {});
+
+ bool received_object = false;
+ EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _, _))
+ .WillOnce([&](const FullTrackName& full_track_name,
+ uint64_t group_sequence, uint64_t object_sequence,
+ uint64_t /*object_send_order*/,
+ MoqtForwardingPreference forwarding_preference,
+ absl::string_view object, bool end_of_message) {
+ EXPECT_EQ(full_track_name.track_namespace, "test");
+ EXPECT_EQ(full_track_name.track_name, "data");
+ EXPECT_EQ(group_sequence, 0u);
+ EXPECT_EQ(object_sequence, 0u);
+ EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kGroup);
+ EXPECT_EQ(object, "object data");
+ EXPECT_TRUE(end_of_message);
+ received_object = true;
+ });
+ bool success = test_harness_.RunUntilWithDefaultTimeout(
+ [&]() { return received_object; });
+ EXPECT_TRUE(success);
+}
+
TEST_F(MoqtIntegrationTest, AnnounceFailure) {
EstablishSession();
testing::MockFunction<void(
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
new file mode 100644
index 0000000..84c2a85
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -0,0 +1,91 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <optional>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+void MoqtOutgoingQueue::AddObject(quiche::QuicheMemSlice payload, bool key) {
+ if (queue_.empty() && !key) {
+ QUICHE_BUG(MoqtOutgoingQueue_AddObject_first_object_not_key)
+ << "The first object ever added to the queue must have the \"key\" "
+ "flag.";
+ return;
+ }
+
+ if (key) {
+ if (!queue_.empty()) {
+ CloseStreamForGroup(current_group_id_);
+ }
+
+ if (queue_.size() == kMaxQueuedGroups) {
+ queue_.erase(queue_.begin());
+ }
+ queue_.emplace_back();
+ ++current_group_id_;
+ }
+
+ absl::string_view payload_view = payload.AsStringView();
+ uint64_t object_id = queue_.back().size();
+ queue_.back().push_back(std::move(payload));
+ PublishObject(current_group_id_, object_id, payload_view,
+ /*close_stream=*/false);
+}
+
+absl::StatusOr<MoqtOutgoingQueue::PublishPastObjectsCallback>
+MoqtOutgoingQueue::OnSubscribeForPast(const SubscribeWindow& window) {
+ QUICHE_BUG_IF(
+ MoqtOutgoingQueue_requires_kGroup,
+ window.forwarding_preference() != MoqtForwardingPreference::kGroup)
+ << "MoqtOutgoingQueue currently only supports kGroup.";
+ if (window.HasEnd()) {
+ // TODO: support this (this would require changing the logic for closing the
+ // stream below).
+ return absl::UnimplementedError("SUBSCRIBEs with an end are not supported");
+ }
+ return [this, &window]() {
+ for (size_t i = 0; i < queue_.size(); ++i) {
+ const uint64_t group_id = first_group_in_queue() + i;
+ const Group& group = queue_[i];
+ const bool is_last_group = (i == queue_.size() - 1);
+ for (size_t j = 0; j < group.size(); ++j) {
+ const FullSequence sequence{group_id, j};
+ if (!window.InWindow(sequence)) {
+ continue;
+ }
+ const bool is_last_object = (j == group.size() - 1);
+ const bool should_close_stream = !is_last_group && is_last_object;
+ PublishObject(group_id, j, group[j].AsStringView(),
+ should_close_stream);
+ }
+ }
+ };
+}
+
+void MoqtOutgoingQueue::CloseStreamForGroup(uint64_t group_id) {
+ session_->CloseObjectStream(track_, group_id);
+}
+
+void MoqtOutgoingQueue::PublishObject(uint64_t group_id, uint64_t object_id,
+ absl::string_view payload,
+ bool close_stream) {
+ session_->PublishObject(track_, group_id, object_id,
+ /*object_send_order=*/group_id, payload,
+ close_stream);
+}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
new file mode 100644
index 0000000..0966b57
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -0,0 +1,79 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
+
+#include <cstddef>
+#include <cstdint>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "absl/container/inlined_vector.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// MoqtOutgoingQueue lets the user send objects by providing the contents of the
+// object and a keyframe flag. The queue will automatically number objects and
+// groups, and maintain a buffer of three most recent groups that will be
+// provided to subscribers automatically.
+//
+// This class is primarily meant to be used by publishers to buffer the frames
+// that they produce. Limitations of this class:
+// - It currently only works with the forwarding preference of kGroup.
+// - It only supports a single session.
+// - Everything is sent in order that it is queued.
+class MoqtOutgoingQueue : public LocalTrack::Visitor {
+ public:
+ explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track)
+ : session_(session), track_(std::move(track)) {}
+
+ MoqtOutgoingQueue(const MoqtOutgoingQueue&) = delete;
+ MoqtOutgoingQueue(MoqtOutgoingQueue&&) = default;
+ MoqtOutgoingQueue& operator=(const MoqtOutgoingQueue&) = delete;
+ MoqtOutgoingQueue& operator=(MoqtOutgoingQueue&&) = default;
+
+ // If `key` is true, the object is placed into a new group, and the previous
+ // group is closed. The first object ever sent MUST have `key` set to true.
+ void AddObject(quiche::QuicheMemSlice payload, bool key);
+
+ // LocalTrack::Visitor implementation.
+ absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
+ const SubscribeWindow& window) override;
+
+ protected:
+ // Interface to MoqtSession; can be mocked out for tests.
+ virtual void CloseStreamForGroup(uint64_t group_id);
+ virtual void PublishObject(uint64_t group_id, uint64_t object_id,
+ absl::string_view payload, bool close_stream);
+
+ private:
+ // The number of recent groups to keep around for newly joined subscribers.
+ static constexpr size_t kMaxQueuedGroups = 3;
+
+ using Object = quiche::QuicheMemSlice;
+ using Group = std::vector<Object>;
+
+ // The number of the oldest group available.
+ uint64_t first_group_in_queue() {
+ return current_group_id_ - queue_.size() + 1;
+ }
+
+ MoqtSession* session_; // Not owned.
+ FullTrackName track_;
+ absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
+ uint64_t current_group_id_ = -1;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_TOOLS_MOQT_OUTGOING_QUEUE_H_
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
new file mode 100644
index 0000000..7e6662e
--- /dev/null
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -0,0 +1,220 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_expect_bug.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_test.h"
+
+namespace moqt {
+namespace {
+
+using ::quic::test::MemSliceFromString;
+
+class TestMoqtOutgoingQueue : public MoqtOutgoingQueue {
+ public:
+ TestMoqtOutgoingQueue()
+ : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"}) {}
+
+ void CallSubscribeForPast(const SubscribeWindow& window) {
+ absl::StatusOr<PublishPastObjectsCallback> callback =
+ OnSubscribeForPast(window);
+ QUICHE_CHECK_OK(callback.status());
+ (*std::move(callback))();
+ }
+
+ MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), (override));
+ MOCK_METHOD(void, PublishObject,
+ (uint64_t group_id, uint64_t object_id, absl::string_view payload,
+ bool close_stream),
+ (override));
+};
+
+TEST(MoqtOutgoingQueue, FirstObjectNotKeyframe) {
+ TestMoqtOutgoingQueue queue;
+ EXPECT_QUICHE_BUG(queue.AddObject(MemSliceFromString("a"), false),
+ "The first object");
+}
+
+TEST(MoqtOutgoingQueue, SingleGroup) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), false);
+}
+
+TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromZero) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), false);
+ queue.CallSubscribeForPast(
+ SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0));
+}
+
+TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), false);
+ queue.CallSubscribeForPast(
+ SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1));
+}
+
+TEST(MoqtOutgoingQueue, TwoGroups) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), false);
+ queue.AddObject(MemSliceFromString("d"), true);
+ queue.AddObject(MemSliceFromString("e"), false);
+ queue.AddObject(MemSliceFromString("f"), false);
+}
+
+TEST(MoqtOutgoingQueue, TwoGroupsPastSubscribe) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c", true));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d", false));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e", false));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), false);
+ queue.AddObject(MemSliceFromString("d"), true);
+ queue.AddObject(MemSliceFromString("e"), false);
+ queue.AddObject(MemSliceFromString("f"), false);
+ queue.CallSubscribeForPast(
+ SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 1));
+}
+
+TEST(MoqtOutgoingQueue, FiveGroups) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "c", false));
+ EXPECT_CALL(queue, PublishObject(1, 1, "d", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(1));
+ EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), true);
+ queue.AddObject(MemSliceFromString("d"), false);
+ queue.AddObject(MemSliceFromString("e"), true);
+ queue.AddObject(MemSliceFromString("f"), false);
+ queue.AddObject(MemSliceFromString("g"), true);
+ queue.AddObject(MemSliceFromString("h"), false);
+ queue.AddObject(MemSliceFromString("i"), true);
+ queue.AddObject(MemSliceFromString("j"), false);
+}
+
+TEST(MoqtOutgoingQueue, FiveGroupsPastSubscribe) {
+ TestMoqtOutgoingQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a", false));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "c", false));
+ EXPECT_CALL(queue, PublishObject(1, 1, "d", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(1));
+ EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h", false));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+
+ // Past SUBSCRIBE would only get the three most recent groups.
+ EXPECT_CALL(queue, PublishObject(2, 0, "e", false));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f", true));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g", false));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h", true));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i", false));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j", false));
+ }
+ queue.AddObject(MemSliceFromString("a"), true);
+ queue.AddObject(MemSliceFromString("b"), false);
+ queue.AddObject(MemSliceFromString("c"), true);
+ queue.AddObject(MemSliceFromString("d"), false);
+ queue.AddObject(MemSliceFromString("e"), true);
+ queue.AddObject(MemSliceFromString("f"), false);
+ queue.AddObject(MemSliceFromString("g"), true);
+ queue.AddObject(MemSliceFromString("h"), false);
+ queue.AddObject(MemSliceFromString("i"), true);
+ queue.AddObject(MemSliceFromString("j"), false);
+ queue.CallSubscribeForPast(
+ SubscribeWindow(0, MoqtForwardingPreference::kGroup, 0, 0));
+}
+
+} // namespace
+} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index dcbbf61..8781b26 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -15,6 +15,7 @@
#include "absl/algorithm/container.h"
#include "absl/status/status.h"
+#include "absl/status/statusor.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
@@ -24,6 +25,7 @@
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_stream.h"
@@ -439,6 +441,44 @@
return (failures == 0);
}
+void MoqtSession::CloseObjectStream(const FullTrackName& full_track_name,
+ uint64_t group_id) {
+ auto track_it = local_tracks_.find(full_track_name);
+ if (track_it == local_tracks_.end()) {
+ QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
+ return;
+ }
+ LocalTrack& track = track_it->second;
+
+ MoqtForwardingPreference forwarding_preference =
+ track.forwarding_preference();
+ if (forwarding_preference == MoqtForwardingPreference::kObject ||
+ forwarding_preference == MoqtForwardingPreference::kDatagram) {
+ QUIC_BUG(MoqtSession_CloseStreamObject_wrong_type)
+ << "Forwarding preferences of Object or Datagram require stream to be "
+ "immediately closed, and thus are not valid CloseObjectStream() "
+ "targets";
+ return;
+ }
+
+ std::vector<SubscribeWindow*> subscriptions =
+ track.ShouldSend({group_id, /*object=*/0});
+ for (SubscribeWindow* subscription : subscriptions) {
+ std::optional<webtransport::StreamId> stream_id =
+ subscription->GetStreamForSequence(
+ FullSequence(group_id, /*object=*/0));
+ if (!stream_id.has_value()) {
+ continue;
+ }
+ webtransport::Stream* stream = session_->GetStreamById(*stream_id);
+ if (stream == nullptr) {
+ continue;
+ }
+ bool success = stream->SendFin();
+ QUICHE_BUG_IF(MoqtSession_CloseObjectStream_fin_failed, !success);
+ }
+}
+
void MoqtSession::Stream::OnCanRead() {
bool fin =
quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) {
@@ -627,37 +667,39 @@
QUICHE_DCHECK(start.has_value()); // Parser enforces this.
std::optional<FullSequence> end = session_->LocationToAbsoluteNumber(
track, message.end_group, message.end_object);
+ LocalTrack::Visitor::PublishPastObjectsCallback publish_past_objects;
+ SubscribeWindow window =
+ end.has_value()
+ ? SubscribeWindow(message.subscribe_id, track.forwarding_preference(),
+ start->group, start->object, end->group,
+ end->object)
+ : SubscribeWindow(message.subscribe_id, track.forwarding_preference(),
+ start->group, start->object);
if (start < track.next_sequence() && track.visitor() != nullptr) {
- // TODO: Rework this. It's not good that the session notifies the
- // application -- presumably triggering the send of a bunch of objects --
- // and only then sends the Subscribe OK.
- SubscribeWindow window =
- end.has_value()
- ? SubscribeWindow(message.subscribe_id,
- track.forwarding_preference(), start->group,
- start->object, end->group, end->object)
- : SubscribeWindow(message.subscribe_id,
- track.forwarding_preference(), start->group,
- start->object);
- std::optional<absl::string_view> past_objects_available =
- track.visitor()->OnSubscribeForPast(window);
- if (past_objects_available.has_value()) {
+ absl::StatusOr<LocalTrack::Visitor::PublishPastObjectsCallback>
+ past_objects_available = track.visitor()->OnSubscribeForPast(window);
+ if (!past_objects_available.ok()) {
SendSubscribeError(message, SubscribeErrorCode::kInternalError,
- "Object does not exist", message.track_alias);
+ past_objects_available.status().message(),
+ message.track_alias);
return;
}
+ publish_past_objects = *std::move(past_objects_available);
}
MoqtSubscribeOk subscribe_ok;
subscribe_ok.subscribe_id = message.subscribe_id;
SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
<< message.track_namespace << ":" << message.track_name;
- if (!end.has_value()) {
+ if (end.has_value()) {
+ track.AddWindow(message.subscribe_id, start->group, start->object,
+ end->group, end->object);
+ } else {
track.AddWindow(message.subscribe_id, start->group, start->object);
- return;
}
- track.AddWindow(message.subscribe_id, start->group, start->object, end->group,
- end->object);
+ if (publish_past_objects) {
+ std::move(publish_past_objects)();
+ }
}
void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 08eb658..fab05df 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -130,7 +130,8 @@
bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
uint64_t object_id, uint64_t object_send_order,
absl::string_view payload, bool end_of_stream);
- // TODO: Add an API to FIN the stream for a particular track/group/object.
+ void CloseObjectStream(const FullTrackName& full_track_name,
+ uint64_t group_id);
// TODO: Add an API to send partial objects.
MoqtSessionCallbacks& callbacks() { return callbacks_; }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 5cac2fe..9f9370a 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -456,8 +456,8 @@
std::unique_ptr<MoqtParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
bool correct_message = true;
- EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_))
- .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_)).WillOnce(Return([] {
+ }));
EXPECT_CALL(mock_stream, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 53652ed..75a204f 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -54,6 +54,11 @@
void RemoveStream(uint64_t group_id, uint64_t object_id);
+ bool HasEnd() const { return end_.has_value(); }
+ MoqtForwardingPreference forwarding_preference() const {
+ return forwarding_preference_;
+ }
+
private:
// Converts an object sequence number into one that matches the way that
// stream IDs are being mapped. (See the comment for send_streams_ below.)
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 6193ccd..5f2957a 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -9,9 +9,11 @@
#include <optional>
#include <vector>
+#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/quiche_callbacks.h"
namespace moqt {
@@ -22,6 +24,8 @@
public:
virtual ~Visitor() = default;
+ using PublishPastObjectsCallback = quiche::SingleUseCallback<void()>;
+
// Requests that application re-publish objects from {start_group,
// start_object} to the latest object. If the return value is nullopt, the
// subscribe is valid and the application will deliver the object and
@@ -29,7 +33,7 @@
// is the error message (the session will send SUBSCRIBE_ERROR). Via this
// API, the application decides if a partially fulfillable
// SUBSCRIBE results in an error or not.
- virtual std::optional<absl::string_view> OnSubscribeForPast(
+ virtual absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
const SubscribeWindow& window) = 0;
};
// |visitor| must not be nullptr.
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 7d174a1..aaf301c 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -8,6 +8,7 @@
#include <cstdint>
#include <optional>
+#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_session.h"
@@ -40,7 +41,7 @@
class MockLocalTrackVisitor : public LocalTrack::Visitor {
public:
- MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeForPast,
+ MOCK_METHOD(absl::StatusOr<PublishPastObjectsCallback>, OnSubscribeForPast,
(const SubscribeWindow& window), (override));
};