Implement moq-chat server.
Refactoring the client will be a different CL
Integrated client/server tests will be a different CL.
PiperOrigin-RevId: 660526015
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 8388cfb..052b6ae 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1510,6 +1510,7 @@
"quic/moqt/moqt_cached_object.h",
"quic/moqt/moqt_framer.h",
"quic/moqt/moqt_known_track_publisher.h",
+ "quic/moqt/moqt_live_relay_queue.h",
"quic/moqt/moqt_messages.h",
"quic/moqt/moqt_outgoing_queue.h",
"quic/moqt/moqt_parser.h",
@@ -1520,6 +1521,8 @@
"quic/moqt/moqt_track.h",
"quic/moqt/test_tools/moqt_simulator_harness.h",
"quic/moqt/test_tools/moqt_test_message.h",
+ "quic/moqt/tools/chat_server.h",
+ "quic/moqt/tools/moq_chat.h",
"quic/moqt/tools/moqt_client.h",
"quic/moqt/tools/moqt_mock_visitor.h",
"quic/moqt/tools/moqt_server.h",
@@ -1530,6 +1533,8 @@
"quic/moqt/moqt_framer_test.cc",
"quic/moqt/moqt_integration_test.cc",
"quic/moqt/moqt_known_track_publisher.cc",
+ "quic/moqt/moqt_live_relay_queue.cc",
+ "quic/moqt/moqt_live_relay_queue_test.cc",
"quic/moqt/moqt_messages.cc",
"quic/moqt/moqt_outgoing_queue.cc",
"quic/moqt/moqt_outgoing_queue_test.cc",
@@ -1545,6 +1550,9 @@
"quic/moqt/moqt_track_test.cc",
"quic/moqt/test_tools/moqt_simulator_harness.cc",
"quic/moqt/tools/chat_client_bin.cc",
+ "quic/moqt/tools/chat_server.cc",
+ "quic/moqt/tools/chat_server_bin.cc",
+ "quic/moqt/tools/moq_chat_test.cc",
"quic/moqt/tools/moqt_client.cc",
"quic/moqt/tools/moqt_end_to_end_test.cc",
"quic/moqt/tools/moqt_ingestion_server_bin.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index c5c6eb6..04e3a31 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1514,6 +1514,7 @@
"src/quiche/quic/moqt/moqt_cached_object.h",
"src/quiche/quic/moqt/moqt_framer.h",
"src/quiche/quic/moqt/moqt_known_track_publisher.h",
+ "src/quiche/quic/moqt/moqt_live_relay_queue.h",
"src/quiche/quic/moqt/moqt_messages.h",
"src/quiche/quic/moqt/moqt_outgoing_queue.h",
"src/quiche/quic/moqt/moqt_parser.h",
@@ -1524,6 +1525,8 @@
"src/quiche/quic/moqt/moqt_track.h",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"src/quiche/quic/moqt/test_tools/moqt_test_message.h",
+ "src/quiche/quic/moqt/tools/chat_server.h",
+ "src/quiche/quic/moqt/tools/moq_chat.h",
"src/quiche/quic/moqt/tools/moqt_client.h",
"src/quiche/quic/moqt/tools/moqt_mock_visitor.h",
"src/quiche/quic/moqt/tools/moqt_server.h",
@@ -1534,6 +1537,8 @@
"src/quiche/quic/moqt/moqt_framer_test.cc",
"src/quiche/quic/moqt/moqt_integration_test.cc",
"src/quiche/quic/moqt/moqt_known_track_publisher.cc",
+ "src/quiche/quic/moqt/moqt_live_relay_queue.cc",
+ "src/quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"src/quiche/quic/moqt/moqt_messages.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue.cc",
"src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
@@ -1549,6 +1554,9 @@
"src/quiche/quic/moqt/moqt_track_test.cc",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
"src/quiche/quic/moqt/tools/chat_client_bin.cc",
+ "src/quiche/quic/moqt/tools/chat_server.cc",
+ "src/quiche/quic/moqt/tools/chat_server_bin.cc",
+ "src/quiche/quic/moqt/tools/moq_chat_test.cc",
"src/quiche/quic/moqt/tools/moqt_client.cc",
"src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
"src/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 39246f8..a24b008 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1513,6 +1513,7 @@
"quiche/quic/moqt/moqt_cached_object.h",
"quiche/quic/moqt/moqt_framer.h",
"quiche/quic/moqt/moqt_known_track_publisher.h",
+ "quiche/quic/moqt/moqt_live_relay_queue.h",
"quiche/quic/moqt/moqt_messages.h",
"quiche/quic/moqt/moqt_outgoing_queue.h",
"quiche/quic/moqt/moqt_parser.h",
@@ -1523,6 +1524,8 @@
"quiche/quic/moqt/moqt_track.h",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"quiche/quic/moqt/test_tools/moqt_test_message.h",
+ "quiche/quic/moqt/tools/chat_server.h",
+ "quiche/quic/moqt/tools/moq_chat.h",
"quiche/quic/moqt/tools/moqt_client.h",
"quiche/quic/moqt/tools/moqt_mock_visitor.h",
"quiche/quic/moqt/tools/moqt_server.h"
@@ -1533,6 +1536,8 @@
"quiche/quic/moqt/moqt_framer_test.cc",
"quiche/quic/moqt/moqt_integration_test.cc",
"quiche/quic/moqt/moqt_known_track_publisher.cc",
+ "quiche/quic/moqt/moqt_live_relay_queue.cc",
+ "quiche/quic/moqt/moqt_live_relay_queue_test.cc",
"quiche/quic/moqt/moqt_messages.cc",
"quiche/quic/moqt/moqt_outgoing_queue.cc",
"quiche/quic/moqt/moqt_outgoing_queue_test.cc",
@@ -1548,6 +1553,9 @@
"quiche/quic/moqt/moqt_track_test.cc",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
"quiche/quic/moqt/tools/chat_client_bin.cc",
+ "quiche/quic/moqt/tools/chat_server.cc",
+ "quiche/quic/moqt/tools/chat_server_bin.cc",
+ "quiche/quic/moqt/tools/moq_chat_test.cc",
"quiche/quic/moqt/tools/moqt_client.cc",
"quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
"quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc",
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
new file mode 100644
index 0000000..5fdcf65
--- /dev/null
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -0,0 +1,155 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_live_relay_queue.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <tuple>
+#include <vector>
+
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+namespace moqt {
+
+bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id,
+ MoqtObjectStatus status,
+ absl::string_view object) {
+ if (queue_.size() == kMaxQueuedGroups) {
+ if (queue_.begin()->first > group_id) {
+ QUICHE_DLOG(INFO) << "Skipping object from group " << group_id
+ << " because it is too old.";
+ return true;
+ }
+ if (queue_.find(group_id) == queue_.end()) {
+ // Erase the oldest group.
+ queue_.erase(queue_.begin());
+ }
+ }
+ QUICHE_CHECK(status == MoqtObjectStatus::kNormal || object.empty());
+ return AddRawObject(FullSequence{group_id, object_id}, status, object);
+}
+
+std::tuple<uint64_t, bool> MoqtLiveRelayQueue::NextObject(Group& group) const {
+ auto it = group.rbegin();
+ if (it == group.rend()) {
+ return std::tuple<uint64_t, bool>(0, false);
+ }
+ return std::tuple<uint64_t, bool>(
+ it->second.sequence.object + 1,
+ (it->second.status == MoqtObjectStatus::kEndOfGroup ||
+ it->second.status == MoqtObjectStatus::kGroupDoesNotExist ||
+ it->second.status == MoqtObjectStatus::kEndOfTrack));
+}
+
+bool MoqtLiveRelayQueue::AddRawObject(FullSequence sequence,
+ MoqtObjectStatus status,
+ absl::string_view payload) {
+ // Validate the input given previously received markers.
+ if (end_of_track_.has_value() && sequence > *end_of_track_) {
+ QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
+ << "track";
+ return false;
+ }
+ if (status == MoqtObjectStatus::kEndOfTrack) {
+ if (sequence < next_sequence_) {
+ QUICHE_DLOG(INFO) << "EndOfTrack is too early.";
+ return false;
+ }
+ // TODO(martinduke): Check that EndOfTrack has normal IDs.
+ end_of_track_ = sequence;
+ }
+ if (status == MoqtObjectStatus::kGroupDoesNotExist && sequence.object > 0) {
+ QUICHE_DLOG(INFO) << "GroupDoesNotExist is not the last object in the "
+ << "group";
+ return false;
+ }
+ auto group_it = queue_.try_emplace(sequence.group);
+ if (!group_it.second) { // Group already exists.
+ auto [next_object_id, is_the_end] = NextObject(group_it.first->second);
+ if (next_object_id <= sequence.object && is_the_end) {
+ QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
+ << "group";
+ return false;
+ }
+ if (status == MoqtObjectStatus::kEndOfGroup &&
+ sequence.object < next_object_id) {
+ QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last "
+ << "object in the group.";
+ return false;
+ }
+ }
+ if (next_sequence_ <= sequence) {
+ next_sequence_ = FullSequence{sequence.group, sequence.object + 1};
+ }
+ std::shared_ptr<quiche::QuicheMemSlice> slice =
+ payload.empty()
+ ? nullptr
+ : std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy(
+ quiche::SimpleBufferAllocator::Get(), payload));
+ auto object_it = group_it.first->second.try_emplace(sequence.object, sequence,
+ status, slice);
+ if (!object_it.second) {
+ QUICHE_DLOG(ERROR) << "Sender is overwriting an existing object.";
+ return false;
+ }
+ for (MoqtObjectListener* listener : listeners_) {
+ listener->OnNewObjectAvailable(sequence);
+ }
+ return true;
+}
+
+std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject(
+ FullSequence sequence) const {
+ auto group_it = queue_.find(sequence.group);
+ if (group_it == queue_.end()) {
+ return std::nullopt;
+ }
+ auto object_it = group_it->second.find(sequence.object);
+ if (object_it == group_it->second.end()) {
+ return std::nullopt;
+ }
+ return CachedObjectToPublishedObject(object_it->second);
+}
+
+std::vector<FullSequence> MoqtLiveRelayQueue::GetCachedObjectsInRange(
+ FullSequence start, FullSequence end) const {
+ std::vector<FullSequence> sequences;
+ SubscribeWindow window(start, end);
+ for (auto& group_it : queue_) {
+ for (auto& object_it : group_it.second) {
+ if (window.InWindow(object_it.second.sequence)) {
+ sequences.push_back(object_it.second.sequence);
+ }
+ }
+ }
+ return sequences;
+}
+
+absl::StatusOr<MoqtTrackStatusCode> MoqtLiveRelayQueue::GetTrackStatus() const {
+ if (end_of_track_.has_value()) {
+ return MoqtTrackStatusCode::kFinished;
+ }
+ if (queue_.empty()) {
+ // TODO(martinduke): Retrieve the track status from upstream.
+ return MoqtTrackStatusCode::kNotYetBegun;
+ }
+ return MoqtTrackStatusCode::kInProgress;
+}
+
+FullSequence MoqtLiveRelayQueue::GetLargestSequence() const {
+ return FullSequence{next_sequence_.group, next_sequence_.object - 1};
+}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
new file mode 100644
index 0000000..41a6270
--- /dev/null
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -0,0 +1,109 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
+
+#include <cstddef>
+#include <cstdint>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "absl/container/btree_map.h"
+#include "absl/container/flat_hash_set.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+
+namespace moqt {
+
+// MoqtLiveRelayQueue lets the user send objects by providing the contents of
+// the object and the object metadata. It will store these by sequence number.
+// When called on to provide a range of objects, it will fill in any missing
+// objects and groups.
+//
+// The queue will maintain a buffer of three most recent groups that will be
+// provided to subscribers automatically.
+//
+// This class is primarily meant to be used by live relays to buffer the
+// frames that arrive for a short time.
+class MoqtLiveRelayQueue : public MoqtTrackPublisher {
+ public:
+ explicit MoqtLiveRelayQueue(FullTrackName track,
+ MoqtForwardingPreference forwarding_preference)
+ : track_(std::move(track)),
+ forwarding_preference_(forwarding_preference) {}
+
+ MoqtLiveRelayQueue(const MoqtLiveRelayQueue&) = delete;
+ MoqtLiveRelayQueue(MoqtLiveRelayQueue&&) = default;
+ MoqtLiveRelayQueue& operator=(const MoqtLiveRelayQueue&) = delete;
+ MoqtLiveRelayQueue& operator=(MoqtLiveRelayQueue&&) = default;
+
+ // TODO: Add destructor that terminates all subscriptions.
+
+ // Publish a received object. Returns false if the object is invalid, given
+ // other non-normal objects indicate that the sequence number should not
+ // occur. A false return value might result in a session error on the
+ // inbound session, but this queue is the only place that retains enough state
+ // to check.
+ bool AddObject(uint64_t group_id, uint64_t object_id, MoqtObjectStatus status,
+ absl::string_view object);
+
+ // MoqtTrackPublisher implementation.
+ const FullTrackName& GetTrackName() const override { return track_; }
+ std::optional<PublishedObject> GetCachedObject(
+ FullSequence sequence) const override;
+ std::vector<FullSequence> GetCachedObjectsInRange(
+ FullSequence start, FullSequence end) const override;
+ void AddObjectListener(MoqtObjectListener* listener) override {
+ listeners_.insert(listener);
+ }
+ void RemoveObjectListener(MoqtObjectListener* listener) override {
+ listeners_.erase(listener);
+ }
+ absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const override;
+ FullSequence GetLargestSequence() const override;
+ MoqtForwardingPreference GetForwardingPreference() const override {
+ return forwarding_preference_;
+ }
+ MoqtPriority GetPublisherPriority() const override {
+ return publisher_priority_;
+ }
+ MoqtDeliveryOrder GetDeliveryOrder() const override {
+ return delivery_order_;
+ }
+
+ bool HasSubscribers() const { return !listeners_.empty(); }
+
+ private:
+ // The number of recent groups to keep around for newly joined subscribers.
+ static constexpr size_t kMaxQueuedGroups = 3;
+
+ // Ordered by object id.
+ using Group = absl::btree_map<uint64_t, CachedObject>;
+
+ // Returns the next expected object ID in |group|, and also |true| if the last
+ // object ends the group.
+ std::tuple<uint64_t, bool> NextObject(Group& group) const;
+
+ bool AddRawObject(FullSequence sequence, MoqtObjectStatus status,
+ absl::string_view payload);
+
+ FullTrackName track_;
+ MoqtForwardingPreference forwarding_preference_;
+ MoqtPriority publisher_priority_ = 128;
+ MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending;
+ absl::btree_map<uint64_t, Group> queue_; // Ordered by group id.
+ absl::flat_hash_set<MoqtObjectListener*> listeners_;
+ std::optional<FullSequence> end_of_track_;
+ FullSequence next_sequence_;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_TOOLS_MOQT_RELAY_QUEUE_H_
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
new file mode 100644
index 0000000..c40a69c
--- /dev/null
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -0,0 +1,349 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_live_relay_queue.h"
+
+#include <cstdint>
+#include <optional>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/platform/api/quiche_test.h"
+
+namespace moqt {
+namespace {
+
+class TestMoqtLiveRelayQueue : public MoqtLiveRelayQueue,
+ public MoqtObjectListener {
+ public:
+ TestMoqtLiveRelayQueue()
+ : MoqtLiveRelayQueue(FullTrackName{"test", "track"},
+ MoqtForwardingPreference::kGroup) {
+ AddObjectListener(this);
+ }
+
+ void OnNewObjectAvailable(FullSequence sequence) {
+ std::optional<PublishedObject> object = GetCachedObject(sequence);
+ QUICHE_CHECK(object.has_value());
+ switch (object->status) {
+ case MoqtObjectStatus::kNormal:
+ PublishObject(object->sequence.group, object->sequence.object,
+ object->payload.AsStringView());
+ break;
+ case MoqtObjectStatus::kObjectDoesNotExist:
+ SkipObject(object->sequence.group, object->sequence.object);
+ break;
+ case MoqtObjectStatus::kGroupDoesNotExist:
+ SkipGroup(object->sequence.group);
+ break;
+ case MoqtObjectStatus::kEndOfGroup:
+ CloseStreamForGroup(object->sequence.group);
+ break;
+ case MoqtObjectStatus::kEndOfTrack:
+ CloseTrack();
+ break;
+ default:
+ EXPECT_TRUE(false);
+ }
+ }
+
+ void CallSubscribeForPast(const SubscribeWindow& window) {
+ std::vector<FullSequence> objects =
+ GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
+ for (FullSequence object : objects) {
+ if (window.InWindow(object)) {
+ OnNewObjectAvailable(object);
+ }
+ }
+ }
+
+ MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
+ MOCK_METHOD(void, PublishObject,
+ (uint64_t group_id, uint64_t object_id,
+ absl::string_view payload),
+ ());
+ MOCK_METHOD(void, SkipObject, (uint64_t group_id, uint64_t object_id), ());
+ MOCK_METHOD(void, SkipGroup, (uint64_t group_id), ());
+ MOCK_METHOD(void, CloseTrack, (), ());
+};
+
+// Duplicates of MoqtOutgoingQueue test cases.
+TEST(MoqtLiveRelayQueue, SingleGroup) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, ""));
+}
+
+TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+}
+
+TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromMidGroup) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+}
+
+TEST(MoqtLiveRelayQueue, TwoGroups) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e"));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "d"));
+ EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "e"));
+ EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kNormal, "f"));
+}
+
+TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e"));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f"));
+
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "d"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "e"));
+ EXPECT_CALL(queue, PublishObject(1, 2, "f"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "d"));
+ EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "e"));
+ EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kNormal, "f"));
+ queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+}
+
+TEST(MoqtLiveRelayQueue, FiveGroups) {
+ TestMoqtLiveRelayQueue queue;
+ ;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "c"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "d"));
+ EXPECT_CALL(queue, CloseStreamForGroup(1));
+ EXPECT_CALL(queue, PublishObject(2, 0, "e"));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f"));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g"));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h"));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i"));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d"));
+ EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e"));
+ EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f"));
+ EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g"));
+ EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h"));
+ EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i"));
+ EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j"));
+}
+
+TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ EXPECT_CALL(queue, PublishObject(1, 0, "c"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "d"));
+ EXPECT_CALL(queue, CloseStreamForGroup(1));
+ EXPECT_CALL(queue, PublishObject(2, 0, "e"));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f"));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g"));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h"));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i"));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j"));
+
+ // Past SUBSCRIBE would only get the three most recent groups.
+ EXPECT_CALL(queue, PublishObject(2, 0, "e"));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f"));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g"));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h"));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i"));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d"));
+ EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e"));
+ EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f"));
+ EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g"));
+ EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h"));
+ EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i"));
+ EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j"));
+ queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+}
+
+TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribeFromMidGroup) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(1, 0, "c"));
+ EXPECT_CALL(queue, PublishObject(1, 1, "d"));
+ EXPECT_CALL(queue, CloseStreamForGroup(1));
+ EXPECT_CALL(queue, PublishObject(2, 0, "e"));
+ EXPECT_CALL(queue, PublishObject(2, 1, "f"));
+ EXPECT_CALL(queue, CloseStreamForGroup(2));
+ EXPECT_CALL(queue, PublishObject(3, 0, "g"));
+ EXPECT_CALL(queue, PublishObject(3, 1, "h"));
+ EXPECT_CALL(queue, CloseStreamForGroup(3));
+ EXPECT_CALL(queue, PublishObject(4, 0, "i"));
+ EXPECT_CALL(queue, PublishObject(4, 1, "j"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d"));
+ EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e"));
+ EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f"));
+ EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g"));
+ EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h"));
+ EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i"));
+ EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j"));
+ // This object will be ignored, but this is not an error.
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, ""));
+}
+
+TEST(MoqtLiveRelayQueue, EndOfTrack) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseTrack());
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kEndOfTrack, ""));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfTrack, ""));
+}
+
+TEST(MoqtLiveRelayQueue, EndOfGroup) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ EXPECT_CALL(queue, CloseStreamForGroup(0));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_FALSE(queue.AddObject(0, 4, MoqtObjectStatus::kNormal, "e"));
+}
+
+TEST(MoqtLiveRelayQueue, GroupDoesNotExist) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, SkipGroup(0));
+ }
+ EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kGroupDoesNotExist, ""));
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kGroupDoesNotExist, ""));
+}
+
+TEST(MoqtLiveRelayQueue, OverwriteObject) {
+ TestMoqtLiveRelayQueue queue;
+ {
+ testing::InSequence seq;
+ EXPECT_CALL(queue, PublishObject(0, 0, "a"));
+ EXPECT_CALL(queue, PublishObject(0, 1, "b"));
+ EXPECT_CALL(queue, PublishObject(0, 2, "c"));
+ }
+ EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a"));
+ EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b"));
+ EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c"));
+ EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, ""));
+ EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "invalid"));
+}
+
+} // namespace
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index a4e4432..41c0b20 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -8,7 +8,6 @@
#include <cstdint>
#include <fstream>
#include <iostream>
-#include <list>
#include <memory>
#include <optional>
#include <sstream>
@@ -295,7 +294,7 @@
if (object_sequence == 0) {
std::cout << user << "\n";
} else {
- std::cout << user << "joined the chat\n";
+ std::cout << user << " joined the chat\n";
}
auto it = other_users_.find(user);
if (it == other_users_.end()) {
@@ -379,8 +378,8 @@
quic::QuicUrl url(args[0], "https");
quic::QuicServerId server_id(url.host(), url.port());
std::string path = url.PathParamsQuery();
- std::string username = args[1];
- std::string chat_id = args[2];
+ const std::string& username = args[1];
+ const std::string& chat_id = args[2];
ChatClient client(server_id, path, username, chat_id);
while (!client.session_is_open()) {
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
new file mode 100644
index 0000000..f984a71
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -0,0 +1,183 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/chat_server.h"
+
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_source.h"
+#include "quiche/quic/moqt/moqt_live_relay_queue.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
+#include "quiche/quic/moqt/tools/moqt_server.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+
+namespace moqt {
+
+ChatServer::ChatServerSessionHandler::ChatServerSessionHandler(
+ MoqtSession* session, ChatServer* server)
+ : session_(session), server_(server) {
+ session_->callbacks().incoming_announce_callback =
+ [&](absl::string_view track_namespace) {
+ std::cout << "Received ANNOUNCE for " << track_namespace << "\n";
+ username_ =
+ server_->strings().GetUsernameFromTrackNamespace(track_namespace);
+ if (username_->empty()) {
+ std::cout << "Malformed ANNOUNCE namespace\n";
+ return std::nullopt;
+ }
+ session_->SubscribeCurrentGroup(track_namespace, "",
+ server_->remote_track_visitor());
+ server_->AddUser(*username_);
+ return std::nullopt;
+ };
+ // TODO(martinduke): Add a callback for UNANNOUNCE that deletes the user and
+ // clears username_, but keeps the handler.
+ session_->callbacks().session_terminated_callback =
+ [&](absl::string_view error_message) {
+ std::cout << "Session terminated, reason = " << error_message << "\n";
+ session_ = nullptr;
+ server_->DeleteSession(it_);
+ };
+ session_->set_publisher(server_->publisher());
+}
+
+ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() {
+ if (username_.has_value()) {
+ server_->DeleteUser(*username_);
+ }
+}
+
+ChatServer::RemoteTrackVisitor::RemoteTrackVisitor(ChatServer* server)
+ : server_(server) {}
+
+void ChatServer::RemoteTrackVisitor::OnReply(
+ const moqt::FullTrackName& full_track_name,
+ std::optional<absl::string_view> reason_phrase) {
+ std::cout << "Subscription to user " << full_track_name.track_namespace
+ << " ";
+ if (reason_phrase.has_value()) {
+ std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
+ std::string username =
+ server_->strings().GetUsernameFromFullTrackName(full_track_name);
+ if (!username.empty()) {
+ std::cout << "Rejection was for malformed namespace\n";
+ return;
+ }
+ server_->DeleteUser(username);
+ } else {
+ std::cout << "ACCEPTED\n";
+ }
+}
+
+void ChatServer::RemoteTrackVisitor::OnObjectFragment(
+ const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
+ uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
+ moqt::MoqtObjectStatus status,
+ moqt::MoqtForwardingPreference /*forwarding_preference*/,
+ absl::string_view object, bool end_of_message) {
+ if (!end_of_message) {
+ std::cerr << "Error: received partial message despite requesting "
+ "buffering\n";
+ }
+ std::string username =
+ server_->strings().GetUsernameFromFullTrackName(full_track_name);
+ if (username.empty()) {
+ std::cout << "Received user message with malformed namespace\n";
+ return;
+ }
+ auto it = server_->user_queues_.find(username);
+ if (it == server_->user_queues_.end()) {
+ std::cerr << "Error: received message for unknown user " << username
+ << "\n";
+ return;
+ }
+ if (status != MoqtObjectStatus::kNormal) {
+ it->second->AddObject(group_sequence, object_sequence, status, "");
+ return;
+ }
+ if (!server_->WriteToFile(username, object)) {
+ std::cout << username << ": " << object << "\n\n";
+ }
+ it->second->AddObject(group_sequence, object_sequence, status, object);
+}
+
+ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
+ absl::string_view chat_id, absl::string_view output_file)
+ : server_(std::move(proof_source), std::move(incoming_session_callback_)),
+ strings_(chat_id),
+ catalog_(std::make_shared<MoqtOutgoingQueue>(
+ strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)),
+ remote_track_visitor_(this) {
+ catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
+ &allocator_, MoqChatStrings::kCatalogHeader)),
+ /*key=*/true);
+ publisher_.Add(catalog_);
+ if (!output_file.empty()) {
+ output_filename_ = output_file;
+ }
+ if (!output_filename_.empty()) {
+ output_file_.open(output_filename_);
+ output_file_ << "Chat transcript:\n";
+ output_file_.flush();
+ }
+}
+
+void ChatServer::AddUser(absl::string_view username) {
+ std::string catalog_data = absl::StrCat("+", username);
+ catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
+ &allocator_, catalog_data)),
+ /*key=*/false);
+ // Add a local track.
+ user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
+ strings_.GetFullTrackNameFromUsername(username),
+ MoqtForwardingPreference::kObject);
+ publisher_.Add(user_queues_[username]);
+}
+
+void ChatServer::DeleteUser(absl::string_view username) {
+ // Delete from Catalog.
+ std::string catalog_data = absl::StrCat("-", username);
+ catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
+ &allocator_, catalog_data)),
+ /*key=*/false);
+ user_queues_.erase(username);
+}
+
+bool ChatServer::WriteToFile(absl::string_view username,
+ absl::string_view message) {
+ if (!output_filename_.empty()) {
+ output_file_ << username << ": " << message << "\n\n";
+ output_file_.flush();
+ return true;
+ }
+ return false;
+}
+
+absl::StatusOr<MoqtConfigureSessionCallback> ChatServer::IncomingSessionHandler(
+ absl::string_view path) {
+ if (!strings_.IsValidPath(path)) {
+ return absl::NotFoundError("Unknown endpoint; try \"/moq-chat\".");
+ }
+ return [this](MoqtSession* session) {
+ sessions_.emplace_front(session, this);
+ // Add a self-reference so it can delete itself from ChatServer::sessions_.
+ sessions_.front().set_iterator(sessions_.cbegin());
+ };
+}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
new file mode 100644
index 0000000..813194e
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -0,0 +1,122 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
+#define QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
+
+#include <cstdint>
+#include <fstream>
+#include <list>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/status/statusor.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_source.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+#include "quiche/quic/moqt/moqt_live_relay_queue.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
+#include "quiche/quic/moqt/tools/moqt_server.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+namespace moqt {
+
+class ChatServer {
+ public:
+ ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
+ absl::string_view chat_id, absl::string_view output_file);
+
+ class RemoteTrackVisitor : public RemoteTrack::Visitor {
+ public:
+ explicit RemoteTrackVisitor(ChatServer* server);
+ void OnReply(const moqt::FullTrackName& full_track_name,
+ std::optional<absl::string_view> reason_phrase) override;
+
+ void OnObjectFragment(
+ const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
+ uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
+ moqt::MoqtObjectStatus /*status*/,
+ moqt::MoqtForwardingPreference /*forwarding_preference*/,
+ absl::string_view object, bool end_of_message) override;
+
+ private:
+ ChatServer* server_;
+ };
+
+ class ChatServerSessionHandler {
+ public:
+ ChatServerSessionHandler(MoqtSession* session, ChatServer* server);
+ ~ChatServerSessionHandler();
+
+ void set_iterator(
+ const std::list<ChatServerSessionHandler>::const_iterator it) {
+ it_ = it;
+ }
+
+ private:
+ MoqtSession* session_; // Not owned.
+ // This design assumes that each server has exactly one username, although
+ // in theory there could be multiple users on one session.
+ std::optional<std::string> username_;
+ ChatServer* server_; // Not owned.
+ // The iterator of this entry in ChatServer::sessions_, so it can destroy
+ // itself later.
+ std::list<ChatServerSessionHandler>::const_iterator it_;
+ };
+
+ MoqtServer& moqt_server() { return server_; }
+
+ RemoteTrackVisitor* remote_track_visitor() { return &remote_track_visitor_; }
+
+ quiche::SimpleBufferAllocator* allocator() { return &allocator_; }
+
+ MoqtOutgoingQueue* catalog() { return catalog_.get(); }
+
+ void AddUser(absl::string_view username);
+
+ void DeleteUser(absl::string_view username);
+
+ void DeleteSession(std::list<ChatServerSessionHandler>::const_iterator it) {
+ sessions_.erase(it);
+ }
+
+ // Returns false if no output file is set.
+ bool WriteToFile(absl::string_view username, absl::string_view message);
+
+ MoqtPublisher* publisher() { return &publisher_; }
+
+ MoqChatStrings& strings() { return strings_; }
+
+ private:
+ absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
+ absl::string_view path);
+
+ MoqtIncomingSessionCallback incoming_session_callback_ =
+ [&](absl::string_view path) { return IncomingSessionHandler(path); };
+
+ MoqtServer server_;
+ MoqChatStrings strings_;
+ MoqtKnownTrackPublisher publisher_;
+ // Allocator for QuicheBuffer that contains catalog objects.
+ quiche::SimpleBufferAllocator allocator_;
+ std::shared_ptr<MoqtOutgoingQueue> catalog_;
+ RemoteTrackVisitor remote_track_visitor_;
+ // indexed by username
+ std::list<ChatServerSessionHandler> sessions_;
+ absl::flat_hash_map<std::string, std::shared_ptr<MoqtLiveRelayQueue>>
+ user_queues_;
+ std::string output_filename_;
+ std::ofstream output_file_;
+};
+
+} // namespace moqt
+#endif // QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
diff --git a/quiche/quic/moqt/tools/chat_server_bin.cc b/quiche/quic/moqt/tools/chat_server_bin.cc
new file mode 100644
index 0000000..899c258
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_server_bin.cc
@@ -0,0 +1,43 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "quiche/quic/moqt/tools/chat_server.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/common/platform/api/quiche_command_line_flags.h"
+#include "quiche/common/platform/api/quiche_default_proof_providers.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_ip_address.h"
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+ std::string, output_file, "",
+ "chat messages will stream to a file instead of stdout");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(std::string, bind_address, "127.0.0.1",
+ "Local IP address to bind to");
+DEFINE_QUICHE_COMMAND_LINE_FLAG(uint16_t, port, 9667,
+ "Port for the server to listen on");
+
+// A server for MoQT over chat, used for interop testing. See
+// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
+int main(int argc, char* argv[]) {
+ const char* usage = "Usage: chat_server [options] <chat-id>";
+ std::vector<std::string> args =
+ quiche::QuicheParseCommandLineFlags(usage, argc, argv);
+ if (args.size() != 1) {
+ quiche::QuichePrintCommandLineFlagHelp(usage);
+ return 1;
+ }
+ moqt::ChatServer server(quiche::CreateDefaultProofSource(), argv[1],
+ quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
+ quiche::QuicheIpAddress bind_address;
+ QUICHE_CHECK(bind_address.FromString(
+ quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
+ server.moqt_server().quic_server().CreateUDPSocketAndListen(
+ quic::QuicSocketAddress(bind_address,
+ quiche::GetQuicheCommandLineFlag(FLAGS_port)));
+ server.moqt_server().quic_server().HandleEventsForever();
+}
diff --git a/quiche/quic/moqt/tools/moq_chat.h b/quiche/quic/moqt/tools/moq_chat.h
new file mode 100644
index 0000000..37e5278
--- /dev/null
+++ b/quiche/quic/moqt/tools/moq_chat.h
@@ -0,0 +1,74 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQ_CHAT_H
+#define QUICHE_QUIC_MOQT_TOOLS_MOQ_CHAT_H
+
+#include <string>
+#include <vector>
+
+#include "absl/strings/match.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_split.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+
+namespace moqt {
+
+// This class encodes all the syntax in moq-chat strings: paths, full track
+// names, and catalog entries.
+class MoqChatStrings {
+ public:
+ explicit MoqChatStrings(absl::string_view chat_id) : chat_id_(chat_id) {}
+
+ static constexpr absl::string_view kBasePath = "moq-chat";
+ static constexpr absl::string_view kParticipantPath = "participant";
+ static constexpr absl::string_view kCatalogPath = "catalog";
+ static constexpr absl::string_view kCatalogHeader = "version=1\n";
+
+ // Verifies that the WebTransport path matches the spec.
+ bool IsValidPath(absl::string_view path) const {
+ return path == absl::StrCat("/", kBasePath);
+ }
+
+ // Returns "" if the track namespace is not a participant track.
+ std::string GetUsernameFromTrackNamespace(
+ absl::string_view track_namespace) const {
+ std::vector<absl::string_view> elements =
+ absl::StrSplit(track_namespace, '/');
+ if (elements.size() != 4 || elements[0] != kBasePath ||
+ elements[1] != chat_id_ || elements[2] != kParticipantPath) {
+ return "";
+ }
+ return std::string(elements[3]);
+ }
+
+ // Returns "" if the full track name is not a participant track.
+ std::string GetUsernameFromFullTrackName(
+ FullTrackName full_track_name) const {
+ // Check the full path
+ if (!full_track_name.track_name.empty()) {
+ return "";
+ }
+ return GetUsernameFromTrackNamespace(full_track_name.track_namespace);
+ }
+
+ FullTrackName GetFullTrackNameFromUsername(absl::string_view username) const {
+ return FullTrackName{absl::StrCat(kBasePath, "/", chat_id_, "/",
+ kParticipantPath, "/", username),
+ ""};
+ }
+
+ FullTrackName GetCatalogName() const {
+ return FullTrackName{absl::StrCat(kBasePath, "/", chat_id_),
+ absl::StrCat("/", kCatalogPath)};
+ }
+
+ private:
+ const std::string chat_id_;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_TOOLS_MOQ_CHAT_H
diff --git a/quiche/quic/moqt/tools/moq_chat_test.cc b/quiche/quic/moqt/tools/moq_chat_test.cc
new file mode 100644
index 0000000..1c93d17
--- /dev/null
+++ b/quiche/quic/moqt/tools/moq_chat_test.cc
@@ -0,0 +1,72 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/moq_chat.h"
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_test.h"
+
+namespace moqt {
+namespace {
+
+class MoqChatStringsTest : public quiche::test::QuicheTest {
+ public:
+ MoqChatStrings strings_{"chat-id"};
+};
+
+TEST_F(MoqChatStringsTest, IsValidPath) {
+ EXPECT_TRUE(strings_.IsValidPath("/moq-chat"));
+ EXPECT_FALSE(strings_.IsValidPath("moq-chat"));
+ EXPECT_FALSE(strings_.IsValidPath("/moq-cha"));
+ EXPECT_FALSE(strings_.IsValidPath("/moq-chats"));
+ EXPECT_FALSE(strings_.IsValidPath("/moq-chat/"));
+}
+
+TEST_F(MoqChatStringsTest, GetUsernameFromTrackNamespace) {
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-chat/chat-id/participant/user"),
+ "user");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "/moq-chat/chat-id/participant/user"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-chat/chat-id/participant/user/"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-cha/chat-id/participant/user"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-chat/chat-i/participant/user"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-chat/chat-id/participan/user"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace("moq-chat/chat-id/user"),
+ "");
+ EXPECT_EQ(strings_.GetUsernameFromTrackNamespace(
+ "moq-chat/chat-id/participant/foo/user"),
+ "");
+}
+
+TEST_F(MoqChatStringsTest, GetUsernameFromFullTrackName) {
+ EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
+ FullTrackName("moq-chat/chat-id/participant/user", "")),
+ "user");
+ EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
+ FullTrackName("moq-chat/chat-id/participant/user", "foo")),
+ "");
+}
+
+TEST_F(MoqChatStringsTest, GetFullTrackNameFromUsername) {
+ EXPECT_EQ(strings_.GetFullTrackNameFromUsername("user"),
+ FullTrackName("moq-chat/chat-id/participant/user", ""));
+}
+
+TEST_F(MoqChatStringsTest, GetCatalogName) {
+ EXPECT_EQ(strings_.GetCatalogName(),
+ FullTrackName("moq-chat/chat-id", "/catalog"));
+}
+
+} // namespace
+} // namespace moqt