Update MoQ chat to draft-00.
Use of a CATALOG was replaced with SUBSCRIBE_ANNOUNCES and ANNOUNCE.
Minor changes to MoQT session based on revealed suboptimal code.
Successful interop with Meta.
PiperOrigin-RevId: 712526076
diff --git a/build/source_list.bzl b/build/source_list.bzl
index a288134..32e4def 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1574,6 +1574,7 @@
"quic/moqt/tools/chat_client_bin.cc",
"quic/moqt/tools/chat_server.cc",
"quic/moqt/tools/chat_server_bin.cc",
+ "quic/moqt/tools/moq_chat.cc",
"quic/moqt/tools/moq_chat_end_to_end_test.cc",
"quic/moqt/tools/moq_chat_test.cc",
"quic/moqt/tools/moqt_client.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 52927da..3b75a6b 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1578,6 +1578,7 @@
"src/quiche/quic/moqt/tools/chat_client_bin.cc",
"src/quiche/quic/moqt/tools/chat_server.cc",
"src/quiche/quic/moqt/tools/chat_server_bin.cc",
+ "src/quiche/quic/moqt/tools/moq_chat.cc",
"src/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
"src/quiche/quic/moqt/tools/moq_chat_test.cc",
"src/quiche/quic/moqt/tools/moqt_client.cc",
diff --git a/build/source_list.json b/build/source_list.json
index d1b698f..5eb9364 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1577,6 +1577,7 @@
"quiche/quic/moqt/tools/chat_client_bin.cc",
"quiche/quic/moqt/tools/chat_server.cc",
"quiche/quic/moqt/tools/chat_server_bin.cc",
+ "quiche/quic/moqt/tools/moq_chat.cc",
"quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
"quiche/quic/moqt/tools/moq_chat_test.cc",
"quiche/quic/moqt/tools/moqt_client.cc",
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index a268578..cb8a44e 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -2,7 +2,6 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include <cstdint>
#include <memory>
#include <optional>
#include <string>
@@ -22,7 +21,6 @@
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
#include "quiche/quic/test_tools/quic_test_utils.h"
-#include "quiche/quic/test_tools/simulator/simulator.h"
#include "quiche/quic/test_tools/simulator/test_harness.h"
#include "quiche/common/platform/api/quiche_test.h"
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 508d103..f898d01 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -159,6 +159,7 @@
enum class MoqtAnnounceErrorCode : uint64_t {
kInternalError = 0,
kAnnounceNotSupported = 1,
+ kNotASubscribedNamespace = 2,
};
enum class QUICHE_EXPORT SubscribeErrorCode : uint64_t {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index ddc9ae9..f46e655 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -422,6 +422,7 @@
if (track == nullptr) {
return;
}
+ QUIC_DLOG(INFO) << ENDPOINT << "Sent UNSUBSCRIBE message for " << name;
MoqtUnsubscribe message;
message.subscribe_id = track->subscribe_id();
SendControlMessage(framer_.SerializeUnsubscribe(message));
@@ -525,7 +526,7 @@
subscribe_done.final_id = subscription.largest_sent();
SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done));
QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for "
- << subscribe_id;
+ << subscription.publisher().GetTrackName();
// Clean up the subscription
published_subscriptions_.erase(it);
for (webtransport::StreamId stream_id : streams_to_reset) {
@@ -1007,6 +1008,12 @@
void MoqtSession::ControlStream::OnUnsubscribeMessage(
const MoqtUnsubscribe& message) {
+ auto it = session_->published_subscriptions_.find(message.subscribe_id);
+ if (it == session_->published_subscriptions_.end()) {
+ return;
+ }
+ QUIC_DLOG(INFO) << ENDPOINT << "Received an UNSUBSCRIBE for "
+ << it->second->publisher().GetTrackName();
session_->SubscribeIsDone(message.subscribe_id,
SubscribeDoneCode::kUnsubscribed, "");
}
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index e98cf75..fa0c7db 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -7,17 +7,14 @@
#include <poll.h>
#include <unistd.h>
-#include <cstdint>
#include <iostream>
#include <memory>
#include <optional>
-#include <sstream>
#include <string>
#include <utility>
-#include <vector>
#include "absl/container/flat_hash_map.h"
-#include "absl/strings/str_split.h"
+#include "absl/functional/bind_front.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_verifier.h"
#include "quiche/quic/core/io/quic_default_event_loop.h"
@@ -29,7 +26,7 @@
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
#include "quiche/quic/platform/api/quic_default_proof_providers.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
@@ -39,13 +36,60 @@
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/simple_buffer_allocator.h"
-namespace moqt {
+namespace moqt::moq_chat {
+
+std::optional<MoqtAnnounceErrorReason> ChatClient::OnIncomingAnnounce(
+ const moqt::FullTrackName& track_namespace, AnnounceEvent announce_type) {
+ if (track_namespace == GetUserNamespace(my_track_name_)) {
+ // Ignore ANNOUNCE for my own track.
+ return std::optional<MoqtAnnounceErrorReason>();
+ }
+ std::optional<FullTrackName> track_name = ConstructTrackNameFromNamespace(
+ track_namespace, GetChatId(my_track_name_));
+ if (announce_type == AnnounceEvent::kUnannounce) {
+ std::cout << "UNANNOUNCE for " << track_namespace.ToString() << "\n";
+ if (track_name.has_value() && other_users_.contains(*track_name)) {
+ session_->Unsubscribe(*track_name);
+ other_users_.erase(*track_name);
+ }
+ return std::nullopt;
+ }
+ std::cout << "ANNOUNCE for " << track_namespace.ToString() << "\n";
+ if (!track_name.has_value()) {
+ std::cout << "ANNOUNCE rejected, invalid namespace\n";
+ return std::make_optional<MoqtAnnounceErrorReason>(
+ MoqtAnnounceErrorCode::kNotASubscribedNamespace,
+ "Not a subscribed namespace");
+ }
+ if (other_users_.contains(*track_name)) {
+ std::cout << "Duplicate ANNOUNCE, send OK and ignore\n";
+ return std::nullopt;
+ }
+ if (GetUsername(my_track_name_) == GetUsername(*track_name)) {
+ std::cout << "ANNOUNCE for a previous instance of my track, "
+ "do not subscribe\n";
+ return std::nullopt;
+ }
+ if (session_->SubscribeCurrentGroup(
+ *track_name, &remote_track_visitor_,
+ MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)),
+ std::nullopt, std::nullopt, std::nullopt})) {
+ ++subscribes_to_make_;
+ other_users_.emplace(*track_name);
+ }
+ return std::nullopt; // Send ANNOUNCE_OK.
+}
ChatClient::ChatClient(const quic::QuicServerId& server_id,
bool ignore_certificate,
std::unique_ptr<ChatUserInterface> interface,
+ absl::string_view chat_id, absl::string_view username,
+ absl::string_view localhost,
quic::QuicEventLoop* event_loop)
- : event_loop_(event_loop), interface_(std::move(interface)) {
+ : my_track_name_(ConstructTrackName(chat_id, username, localhost)),
+ event_loop_(event_loop),
+ remote_track_visitor_(this),
+ interface_(std::move(interface)) {
if (event_loop_ == nullptr) {
quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
local_event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
@@ -72,10 +116,13 @@
std::cerr << "Closed session, reason = " << error_message << "\n";
session_is_open_ = false;
connect_failed_ = true;
+ session_ = nullptr;
};
session_callbacks_.session_deleted_callback = [this]() {
session_ = nullptr;
};
+ session_callbacks_.incoming_announce_callback =
+ absl::bind_front(&ChatClient::OnIncomingAnnounce, this);
interface_->Initialize(
[this](absl::string_view input_message) {
OnTerminalLineInput(input_message);
@@ -83,10 +130,7 @@
event_loop_);
}
-bool ChatClient::Connect(absl::string_view path, absl::string_view username,
- absl::string_view chat_id) {
- username_ = username;
- chat_strings_.emplace(chat_id);
+bool ChatClient::Connect(absl::string_view path) {
client_->Connect(std::string(path), std::move(session_callbacks_));
while (!session_is_open_ && !connect_failed_) {
RunEventLoop();
@@ -99,6 +143,13 @@
return;
}
if (input_message == "/exit") {
+ // Clean teardown of SUBSCRIBE_ANNOUNCES, ANNOUNCE, SUBSCRIBE.
+ session_->UnsubscribeAnnounces(GetChatNamespace(my_track_name_));
+ session_->Unannounce(GetUserNamespace(my_track_name_));
+ for (const auto& track_name : other_users_) {
+ session_->Unsubscribe(track_name);
+ }
+ other_users_.clear();
session_is_open_ = false;
return;
}
@@ -111,175 +162,96 @@
const FullTrackName& full_track_name,
std::optional<FullSequence> /*largest_id*/,
std::optional<absl::string_view> reason_phrase) {
- client_->subscribes_to_make_--;
- if (full_track_name == client_->chat_strings_->GetCatalogName()) {
- std::cout << "Subscription to catalog ";
- } else {
- std::cout << "Subscription to user " << full_track_name.ToString() << " ";
+ auto it = client_->other_users_.find(full_track_name);
+ if (it == client_->other_users_.end()) {
+ std::cout << "Error: received reply for unknown user "
+ << full_track_name.ToString() << "\n";
+ return;
}
+ --client_->subscribes_to_make_;
+ std::cout << "Subscription to user " << GetUsername(*it) << " ";
if (reason_phrase.has_value()) {
std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
+ client_->other_users_.erase(it);
} else {
std::cout << "ACCEPTED\n";
}
}
void ChatClient::RemoteTrackVisitor::OnObjectFragment(
- const FullTrackName& full_track_name, FullSequence sequence,
+ const FullTrackName& full_track_name, FullSequence /*sequence*/,
MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/,
absl::string_view object, bool end_of_message) {
if (!end_of_message) {
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
}
- if (full_track_name == client_->chat_strings_->GetCatalogName()) {
- if (sequence.group < client_->catalog_group_) {
- std::cout << "Ignoring old catalog";
- return;
- }
- client_->ProcessCatalog(object, this, sequence.group, sequence.object);
- return;
- }
- std::string username(
- client_->chat_strings_->GetUsernameFromFullTrackName(full_track_name));
- if (!client_->other_users_.contains(username)) {
- std::cout << "Username " << username << "doesn't exist\n";
+ auto it = client_->other_users_.find(full_track_name);
+ if (it == client_->other_users_.end()) {
+ std::cout << "Error: received message for unknown user "
+ << full_track_name.ToString() << "\n";
return;
}
if (object.empty()) {
return;
}
- client_->WriteToOutput(username, object);
+ client_->WriteToOutput(GetUsername(*it), object);
}
-bool ChatClient::AnnounceAndSubscribe() {
+bool ChatClient::AnnounceAndSubscribeAnnounces() {
session_ = client_->session();
if (session_ == nullptr) {
std::cout << "Failed to connect.\n";
return false;
}
- if (!username_.empty()) {
- // A server log might choose to not provide a username, thus getting all
- // the messages without adding itself to the catalog.
- FullTrackName my_track_name =
- chat_strings_->GetFullTrackNameFromUsername(username_);
- queue_ = std::make_shared<MoqtOutgoingQueue>(
- my_track_name, MoqtForwardingPreference::kSubgroup);
- publisher_.Add(queue_);
- session_->set_publisher(&publisher_);
- MoqtOutgoingAnnounceCallback announce_callback =
- [this](FullTrackName track_namespace,
- std::optional<MoqtAnnounceErrorReason> reason) {
- if (reason.has_value()) {
- std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
- session_->Error(MoqtError::kInternalError,
- "Local ANNOUNCE rejected");
- return;
- }
- std::cout << "ANNOUNCE for " << track_namespace.ToString()
- << " accepted\n";
+ // TODO: A server log might choose to not provide a username, thus getting all
+ // the messages without adding itself to the catalog.
+ queue_ = std::make_shared<MoqtOutgoingQueue>(
+ my_track_name_, MoqtForwardingPreference::kSubgroup);
+ publisher_.Add(queue_);
+ session_->set_publisher(&publisher_);
+ MoqtOutgoingAnnounceCallback announce_callback =
+ [this](FullTrackName track_namespace,
+ std::optional<MoqtAnnounceErrorReason> reason) {
+ if (reason.has_value()) {
+ std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
+ session_->Error(MoqtError::kInternalError, "Local ANNOUNCE rejected");
return;
- };
- FullTrackName my_track_namespace = my_track_name;
- my_track_namespace.NameToNamespace();
- std::cout << "Announcing " << my_track_namespace.ToString() << "\n";
- session_->Announce(my_track_namespace, std::move(announce_callback));
- }
- remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
- FullTrackName catalog_name = chat_strings_->GetCatalogName();
- if (!session_->SubscribeCurrentGroup(
- catalog_name, remote_track_visitor_.get(),
- MoqtSubscribeParameters{username_, std::nullopt, std::nullopt,
- std::nullopt})) {
- std::cout << "Failed to get catalog\n";
- return false;
- }
+ }
+ std::cout << "ANNOUNCE for " << track_namespace.ToString()
+ << " accepted\n";
+ return;
+ };
+ std::cout << "Announcing " << GetUserNamespace(my_track_name_).ToString()
+ << "\n";
+ session_->Announce(GetUserNamespace(my_track_name_),
+ std::move(announce_callback));
+
+ // Send SUBSCRIBE_ANNOUNCE. Pop 3 levels of namespace to get to {moq-chat,
+ // chat-id}
+ MoqtOutgoingSubscribeAnnouncesCallback subscribe_announces_callback =
+ [this](FullTrackName track_namespace,
+ std::optional<SubscribeErrorCode> error,
+ absl::string_view reason) {
+ if (error.has_value()) {
+ std::cout << "SUBSCRIBE_ANNOUNCES rejected, " << reason << "\n";
+ session_->Error(MoqtError::kInternalError,
+ "Local SUBSCRIBE_ANNOUNCES rejected");
+ return;
+ }
+ std::cout << "SUBSCRIBE_ANNOUNCES for " << track_namespace.ToString()
+ << " accepted\n";
+ return;
+ };
+ session_->SubscribeAnnounces(
+ GetChatNamespace(my_track_name_), std::move(subscribe_announces_callback),
+ MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)),
+ std::nullopt, std::nullopt, std::nullopt});
+
while (session_is_open_ && is_syncing()) {
RunEventLoop();
}
return session_is_open_;
}
-void ChatClient::ProcessCatalog(absl::string_view object,
- SubscribeRemoteTrack::Visitor* visitor,
- uint64_t group_sequence,
- uint64_t object_sequence) {
- std::string message(object);
- std::istringstream f(message);
- // std::string line;
- bool got_version = true;
- if (object_sequence == 0) {
- std::cout << "Received new Catalog. Users:\n";
- got_version = false;
- }
- std::vector<absl::string_view> lines =
- absl::StrSplit(object, '\n', absl::SkipEmpty());
- for (absl::string_view line : lines) {
- if (!got_version) {
- if (line != "version=1") {
- session_->Error(MoqtError::kProtocolViolation,
- "Catalog does not begin with version");
- return;
- }
- got_version = true;
- continue;
- }
- std::string user;
- bool add = true;
- if (object_sequence > 0) {
- switch (line[0]) {
- case '-':
- add = false;
- break;
- case '+':
- break;
- default:
- std::cerr << "Catalog update with neither + nor -\n";
- return;
- }
- user = line.substr(1, line.size() - 1);
- } else {
- user = line;
- }
- if (username_ == user) {
- std::cout << user << "\n";
- continue;
- }
- if (!add) {
- session_->Unsubscribe(chat_strings_->GetFullTrackNameFromUsername(user));
- std::cout << user << "left the chat\n";
- other_users_.erase(user);
- continue;
- }
- if (object_sequence == 0) {
- std::cout << user << "\n";
- } else {
- std::cout << user << " joined the chat\n";
- }
- auto it = other_users_.find(user);
- if (it == other_users_.end()) {
- FullTrackName to_subscribe =
- chat_strings_->GetFullTrackNameFromUsername(user);
- auto new_user = other_users_.emplace(
- std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
- ChatUser& user_record = new_user.first->second;
- session_->SubscribeCurrentGroup(user_record.full_track_name, visitor);
- subscribes_to_make_++;
- } else {
- if (it->second.from_group == group_sequence) {
- session_->Error(MoqtError::kProtocolViolation,
- "User listed twice in Catalog");
- return;
- }
- it->second.from_group = group_sequence;
- }
- }
- if (object_sequence == 0) { // Eliminate users that are no longer present
- absl::erase_if(other_users_, [&](const auto& kv) {
- return kv.second.from_group != group_sequence;
- });
- }
- catalog_group_ = group_sequence;
-}
-
-} // namespace moqt
+} // namespace moqt::moq_chat
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index e2c341f..b56142c 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -5,12 +5,10 @@
#ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
#define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
-#include <cstdint>
#include <memory>
#include <optional>
-#include <string>
-#include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_server_id.h"
@@ -21,12 +19,11 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_callbacks.h"
-namespace moqt {
+namespace moqt::moq_chat {
constexpr quic::QuicTime::Delta kChatEventLoopDuration =
quic::QuicTime::Delta::FromMilliseconds(500);
@@ -58,6 +55,8 @@
// an event loop.
ChatClient(const quic::QuicServerId& server_id, bool ignore_certificate,
std::unique_ptr<ChatUserInterface> interface,
+ absl::string_view chat_id, absl::string_view username,
+ absl::string_view localhost,
quic::QuicEventLoop* event_loop = nullptr);
~ChatClient() {
if (session_ != nullptr) {
@@ -67,8 +66,7 @@
}
// Establish the MoQT session. Returns false if it fails.
- bool Connect(absl::string_view path, absl::string_view username,
- absl::string_view chat_id);
+ bool Connect(absl::string_view path);
void OnTerminalLineInput(absl::string_view input_message);
@@ -111,7 +109,7 @@
};
// Returns false on error.
- bool AnnounceAndSubscribe();
+ bool AnnounceAndSubscribeAnnounces();
bool session_is_open() const { return session_is_open_; }
@@ -119,29 +117,18 @@
// catalog, subscribing to all the users in it, and waiting for the server
// to subscribe to the local track.
bool is_syncing() const {
- return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+ return subscribes_to_make_ > 0 ||
(queue_ == nullptr || !queue_->HasSubscribers());
}
private:
void RunEventLoop() { event_loop_->RunEventLoopOnce(kChatEventLoopDuration); }
-
- // Objects from the same catalog group arrive on the same stream, and in
- // object sequence order.
- void ProcessCatalog(absl::string_view object,
- moqt::SubscribeRemoteTrack::Visitor* visitor,
- uint64_t group_sequence, uint64_t object_sequence);
-
- struct ChatUser {
- moqt::FullTrackName full_track_name;
- uint64_t from_group;
- ChatUser(const moqt::FullTrackName& ftn, uint64_t group)
- : full_track_name(ftn), from_group(group) {}
- };
+ // Callback for incoming announces.
+ std::optional<MoqtAnnounceErrorReason> OnIncomingAnnounce(
+ const moqt::FullTrackName& track_namespace, AnnounceEvent announce_type);
// Basic session information
- std::string username_;
- std::optional<moqt::MoqChatStrings> chat_strings_;
+ FullTrackName my_track_name_;
// General state variables
// The event loop to use for this client.
@@ -156,13 +143,12 @@
moqt::MoqtSessionCallbacks session_callbacks_;
// Related to syncing.
- std::optional<uint64_t> catalog_group_;
- absl::flat_hash_map<std::string, ChatUser> other_users_;
- int subscribes_to_make_ = 1;
+ absl::flat_hash_set<FullTrackName> other_users_;
+ int subscribes_to_make_ = 0;
// Related to subscriptions/announces
// TODO: One for each subscribe
- std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
+ RemoteTrackVisitor remote_track_visitor_;
// Handling outgoing messages
std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
@@ -171,6 +157,6 @@
std::unique_ptr<ChatUserInterface> interface_;
};
-} // namespace moqt
+} // namespace moqt::moq_chat
#endif // QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 7617789..5e9125c 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -31,8 +31,11 @@
std::string, output_file, "",
"chat messages will stream to a file instead of stdout");
+using ::moqt::moq_chat::ChatClient;
+using ::moqt::moq_chat::ChatUserInterface;
+
// Writes messages to a file, when directed from the command line.
-class FileOutput : public moqt::ChatUserInterface {
+class FileOutput : public ChatUserInterface {
public:
explicit FileOutput(absl::string_view filename, absl::string_view username)
: username_(username) {
@@ -66,7 +69,7 @@
event_loop_ == nullptr)
<< "IoLoop called before Initialize";
while (poll(&poll_settings_, 1, 0) <= 0) {
- event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ event_loop_->RunEventLoopOnce(moqt::moq_chat::kChatEventLoopDuration);
}
std::getline(std::cin, message_to_send);
callback_(message_to_send);
@@ -86,7 +89,7 @@
};
// Writes messages to the terminal, without messing up entry of new messages.
-class CliOutput : public moqt::ChatUserInterface {
+class CliOutput : public ChatUserInterface {
public:
void Initialize(quic::InteractiveCli::LineCallback callback,
quic::QuicEventLoop* event_loop) override {
@@ -107,7 +110,7 @@
QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized,
event_loop_ == nullptr)
<< "IoLoop called before Initialize";
- event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ event_loop_->RunEventLoopOnce(moqt::moq_chat::kChatEventLoopDuration);
}
private:
@@ -118,10 +121,11 @@
// A client for MoQT over chat, used for interop testing. See
// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
int main(int argc, char* argv[]) {
- const char* usage = "Usage: chat_client [options] <url> <username> <chat-id>";
+ const char* usage =
+ "Usage: chat_client [options] <url> <username> <chat-id> <localhost>";
std::vector<std::string> args =
quiche::QuicheParseCommandLineFlags(usage, argc, argv);
- if (args.size() != 3) {
+ if (args.size() != 4) {
quiche::QuichePrintCommandLineFlagHelp(usage);
return 1;
}
@@ -130,24 +134,25 @@
std::string path = url.PathParamsQuery();
const std::string& username = args[1];
const std::string& chat_id = args[2];
+ std::string localhost = args[3];
std::string output_filename =
quiche::GetQuicheCommandLineFlag(FLAGS_output_file);
- std::unique_ptr<moqt::ChatUserInterface> interface;
+ std::unique_ptr<ChatUserInterface> interface;
if (!output_filename.empty()) {
interface = std::make_unique<FileOutput>(output_filename, username);
} else { // Use the CLI.
interface = std::make_unique<CliOutput>();
}
- moqt::ChatClient client(
+ ChatClient client(
server_id,
quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
- std::move(interface));
+ std::move(interface), chat_id, username, localhost);
- if (!client.Connect(path, username, chat_id)) {
+ if (!client.Connect(path)) {
return 1;
}
- if (!client.AnnounceAndSubscribe()) {
+ if (!client.AnnounceAndSubscribeAnnounces()) {
return 1;
}
client.IoLoop();
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 243d13b..871390b 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -10,54 +10,114 @@
#include <string>
#include <utility>
+#include "absl/functional/bind_front.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
-#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_source.h"
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
-#include "quiche/common/platform/api/quiche_mem_slice.h"
-#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/simple_buffer_allocator.h"
-namespace moqt {
+namespace moqt::moq_chat {
+
+std::optional<MoqtAnnounceErrorReason>
+ChatServer::ChatServerSessionHandler::OnIncomingAnnounce(
+ const moqt::FullTrackName& track_namespace, AnnounceEvent announce_type) {
+ if (track_name_.has_value() &&
+ GetUserNamespace(*track_name_) != track_namespace) {
+ // ChatServer only supports one track per client session at a time. Return
+ // ANNOUNCE_OK and exit.
+ return std::nullopt;
+ }
+ // Accept the ANNOUNCE regardless of the chat_id.
+ track_name_ = ConstructTrackNameFromNamespace(track_namespace,
+ GetChatId(track_namespace));
+ if (!track_name_.has_value()) {
+ std::cout << "Malformed ANNOUNCE namespace\n";
+ return MoqtAnnounceErrorReason(
+ MoqtAnnounceErrorCode::kNotASubscribedNamespace,
+ "Not a valid namespace for this chat.");
+ }
+ if (announce_type == AnnounceEvent::kUnannounce) {
+ std::cout << "Received UNANNOUNCE for " << track_namespace.ToString()
+ << "\n";
+ server_->DeleteUser(*track_name_);
+ track_name_.reset();
+ return std::nullopt;
+ }
+ std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n";
+ session_->SubscribeCurrentGroup(*track_name_,
+ server_->remote_track_visitor());
+ server_->AddUser(*track_name_);
+ return std::nullopt;
+}
+
+void ChatServer::ChatServerSessionHandler::OnOutgoingAnnounceReply(
+ FullTrackName track_namespace,
+ std::optional<MoqtAnnounceErrorReason> error_message) {
+ // Log the result; the server doesn't really care.
+ std::cout << "ANNOUNCE for " << track_namespace.ToString();
+ if (error_message.has_value()) {
+ std::cout << " failed with error: " << error_message->reason_phrase << "\n";
+ } else {
+ std::cout << " succeeded\n";
+ }
+}
ChatServer::ChatServerSessionHandler::ChatServerSessionHandler(
MoqtSession* session, ChatServer* server)
: session_(session), server_(server) {
- session_->callbacks().incoming_announce_callback =
- [&](FullTrackName track_namespace, AnnounceEvent announce_type) {
- FullTrackName track_name = track_namespace;
- track_name.AddElement("");
- if (announce_type == AnnounceEvent::kAnnounce) {
- std::cout << "Received ANNOUNCE for ";
- } else {
- std::cout << "Received UNANNOUNCE for ";
- }
- std::cout << track_namespace.ToString() << "\n";
- username_ = server_->strings().GetUsernameFromFullTrackName(track_name);
- if (username_->empty()) {
- std::cout << "Malformed ANNOUNCE namespace\n";
- return std::nullopt;
- }
- session_->SubscribeCurrentGroup(track_name,
- server_->remote_track_visitor());
- server_->AddUser(*username_);
- return std::nullopt;
- };
- // TODO(martinduke): Add a callback for UNANNOUNCE that deletes the user and
- // clears username_, but keeps the handler.
+ session_->callbacks().incoming_announce_callback = absl::bind_front(
+ &ChatServer::ChatServerSessionHandler::OnIncomingAnnounce, this);
session_->callbacks().session_terminated_callback =
- [&](absl::string_view error_message) {
+ [this](absl::string_view error_message) {
std::cout << "Session terminated, reason = " << error_message << "\n";
session_ = nullptr;
server_->DeleteSession(it_);
+ if (track_name_.has_value()) {
+ server_->DeleteUser(*track_name_);
+ }
+ };
+ session_->callbacks().incoming_subscribe_announces_callback =
+ [this](const moqt::FullTrackName& chat_namespace,
+ SubscribeEvent subscribe_type) {
+ if (subscribe_type == SubscribeEvent::kSubscribe) {
+ subscribed_namespaces_.insert(chat_namespace);
+ std::cout << "Received SUBSCRIBE_ANNOUNCES for ";
+ } else {
+ subscribed_namespaces_.erase(chat_namespace);
+ std::cout << "Received UNSUBSCRIBE_ANNOUNCES for ";
+ }
+ std::cout << chat_namespace.ToString() << "\n";
+ if (!IsValidChatNamespace(chat_namespace)) {
+ std::cout << "Not a valid moq-chat namespace.\n";
+ return std::make_optional(
+ MoqtSubscribeErrorReason{SubscribeErrorCode::kTrackDoesNotExist,
+ "Not a valid namespace for this chat."});
+ }
+ if (subscribe_type == SubscribeEvent::kUnsubscribe) {
+ return std::optional<MoqtSubscribeErrorReason>();
+ }
+ // Send all ANNOUNCE.
+ for (auto& [track_name, queue] : server_->user_queues_) {
+ std::cout << "Sending ANNOUNCE for "
+ << GetUserNamespace(track_name).ToString() << "\n";
+ if (track_name_.has_value() &&
+ GetUsername(*track_name_) == GetUsername(track_name)) {
+ // Don't ANNOUNCE a client to itself.
+ continue;
+ }
+ session_->Announce(
+ GetUserNamespace(track_name),
+ absl::bind_front(&ChatServer::ChatServerSessionHandler::
+ OnOutgoingAnnounceReply,
+ this));
+ }
+ return std::optional<MoqtSubscribeErrorReason>();
};
session_->set_publisher(server_->publisher());
}
@@ -66,8 +126,8 @@
if (!server_->is_running_) {
return;
}
- if (username_.has_value()) {
- server_->DeleteUser(*username_);
+ if (track_name_.has_value()) {
+ server_->DeleteUser(*track_name_);
}
}
@@ -78,20 +138,12 @@
const moqt::FullTrackName& full_track_name,
std::optional<FullSequence> /*largest_id*/,
std::optional<absl::string_view> reason_phrase) {
- std::cout << "Subscription to user "
- << server_->strings().GetUsernameFromFullTrackName(full_track_name)
- << " ";
+ std::cout << "Subscription to " << full_track_name.ToString();
if (reason_phrase.has_value()) {
- std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
- std::string username =
- server_->strings().GetUsernameFromFullTrackName(full_track_name);
- if (!username.empty()) {
- std::cout << "Rejection was for malformed namespace\n";
- return;
- }
- server_->DeleteUser(username);
+ std::cout << " REJECTED, reason = " << *reason_phrase << "\n";
+ server_->DeleteUser(full_track_name);
} else {
- std::cout << "ACCEPTED\n";
+ std::cout << " ACCEPTED\n";
}
}
@@ -103,40 +155,26 @@
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
}
- std::string username =
- server_->strings().GetUsernameFromFullTrackName(full_track_name);
- if (username.empty()) {
- std::cout << "Received user message with malformed namespace\n";
- return;
- }
- auto it = server_->user_queues_.find(username);
+ auto it = server_->user_queues_.find(full_track_name);
if (it == server_->user_queues_.end()) {
- std::cerr << "Error: received message for unknown user " << username
- << "\n";
+ std::cerr << "Error: received message for unknown track "
+ << full_track_name.ToString() << "\n";
return;
}
if (status != MoqtObjectStatus::kNormal) {
it->second->AddObject(sequence, status);
return;
}
- if (!server_->WriteToFile(username, object)) {
- std::cout << username << ": " << object << "\n\n";
+ if (!server_->WriteToFile(GetUsername(full_track_name), object)) {
+ std::cout << GetUsername(full_track_name) << ": " << object << "\n\n";
}
it->second->AddObject(sequence, object);
}
ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
- absl::string_view chat_id, absl::string_view output_file)
+ absl::string_view output_file)
: server_(std::move(proof_source), std::move(incoming_session_callback_)),
- strings_(chat_id),
- catalog_(std::make_shared<MoqtOutgoingQueue>(
- strings_.GetCatalogName(), MoqtForwardingPreference::kSubgroup)),
remote_track_visitor_(this) {
- catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
- quiche::SimpleBufferAllocator::Get(),
- MoqChatStrings::kCatalogHeader)),
- /*key=*/true);
- publisher_.Add(catalog_);
if (!output_file.empty()) {
output_filename_ = output_file;
}
@@ -154,27 +192,30 @@
server_.quic_server().Shutdown();
}
-void ChatServer::AddUser(absl::string_view username) {
- std::string catalog_data = absl::StrCat("+", username);
- catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
- quiche::SimpleBufferAllocator::Get(), catalog_data)),
- /*key=*/false);
+void ChatServer::AddUser(FullTrackName track_name) {
// Add a local track.
- user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
- strings_.GetFullTrackNameFromUsername(username),
- MoqtForwardingPreference::kSubgroup);
- publisher_.Add(user_queues_[username]);
+ user_queues_[track_name] = std::make_shared<MoqtLiveRelayQueue>(
+ track_name, MoqtForwardingPreference::kSubgroup);
+ publisher_.Add(user_queues_[track_name]);
+ FullTrackName track_namespace = track_name;
+ track_namespace.NameToNamespace();
+ for (auto& session : sessions_) {
+ session.AnnounceIfSubscribed(track_namespace);
+ }
}
-void ChatServer::DeleteUser(absl::string_view username) {
- // Delete from Catalog.
- std::string catalog_data = absl::StrCat("-", username);
- catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
- quiche::SimpleBufferAllocator::Get(), catalog_data)),
- /*key=*/false);
- user_queues_[username]->RemoveAllSubscriptions();
- user_queues_.erase(username);
- publisher_.Delete(strings_.GetFullTrackNameFromUsername(username));
+void ChatServer::DeleteUser(FullTrackName track_name) {
+ // RemoveAllSubscriptions() sends a SUBSCRIBE_DONE for each.
+ user_queues_[track_name]->RemoveAllSubscriptions();
+ user_queues_.erase(track_name);
+ publisher_.Delete(track_name);
+ FullTrackName track_namespace = GetUserNamespace(track_name);
+ for (auto& session : sessions_) {
+ session.UnannounceIfSubscribed(track_namespace);
+ }
+ if (user_queues_.empty()) {
+ std::cout << "No more users!\n";
+ }
}
bool ChatServer::WriteToFile(absl::string_view username,
@@ -189,7 +230,7 @@
absl::StatusOr<MoqtConfigureSessionCallback> ChatServer::IncomingSessionHandler(
absl::string_view path) {
- if (!strings_.IsValidPath(path)) {
+ if (!IsValidPath(path)) {
return absl::NotFoundError("Unknown endpoint; try \"/moq-chat\".");
}
return [this](MoqtSession* session) {
@@ -199,4 +240,4 @@
};
}
-} // namespace moqt
+} // namespace moqt::moq_chat
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 0c883ed..5a4c3b9 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -5,7 +5,6 @@
#ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
#define QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
-#include <cstdint>
#include <fstream>
#include <list>
#include <memory>
@@ -13,26 +12,27 @@
#include <string>
#include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
+#include "absl/functional/bind_front.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_source.h"
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
namespace moqt {
+namespace moq_chat {
class ChatServer {
public:
ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
- absl::string_view chat_id, absl::string_view output_file);
+ absl::string_view output_file);
~ChatServer();
class RemoteTrackVisitor : public SubscribeRemoteTrack::Visitor {
@@ -62,12 +62,43 @@
it_ = it;
}
+ void AnnounceIfSubscribed(FullTrackName track_namespace) {
+ for (const FullTrackName& subscribed_namespace : subscribed_namespaces_) {
+ if (track_namespace.InNamespace(subscribed_namespace)) {
+ session_->Announce(
+ track_namespace,
+ absl::bind_front(&ChatServer::ChatServerSessionHandler::
+ OnOutgoingAnnounceReply,
+ this));
+ return;
+ }
+ }
+ }
+
+ void UnannounceIfSubscribed(FullTrackName track_namespace) {
+ for (const FullTrackName& subscribed_namespace : subscribed_namespaces_) {
+ if (track_namespace.InNamespace(subscribed_namespace)) {
+ session_->Unannounce(track_namespace);
+ return;
+ }
+ }
+ }
+
private:
+ // Callback for incoming announces.
+ std::optional<MoqtAnnounceErrorReason> OnIncomingAnnounce(
+ const moqt::FullTrackName& track_namespace,
+ AnnounceEvent announce_type);
+ void OnOutgoingAnnounceReply(
+ FullTrackName track_namespace,
+ std::optional<MoqtAnnounceErrorReason> error_message);
+
MoqtSession* session_; // Not owned.
// This design assumes that each server has exactly one username, although
// in theory there could be multiple users on one session.
- std::optional<std::string> username_;
+ std::optional<FullTrackName> track_name_;
ChatServer* server_; // Not owned.
+ absl::flat_hash_set<FullTrackName> subscribed_namespaces_;
// The iterator of this entry in ChatServer::sessions_, so it can destroy
// itself later.
std::list<ChatServerSessionHandler>::const_iterator it_;
@@ -77,11 +108,9 @@
RemoteTrackVisitor* remote_track_visitor() { return &remote_track_visitor_; }
- MoqtOutgoingQueue* catalog() { return catalog_.get(); }
+ void AddUser(FullTrackName track_name);
- void AddUser(absl::string_view username);
-
- void DeleteUser(absl::string_view username);
+ void DeleteUser(FullTrackName track_name);
void DeleteSession(std::list<ChatServerSessionHandler>::const_iterator it) {
sessions_.erase(it);
@@ -92,8 +121,6 @@
MoqtPublisher* publisher() { return &publisher_; }
- MoqChatStrings& strings() { return strings_; }
-
int num_users() const { return user_queues_.size(); }
private:
@@ -106,16 +133,14 @@
bool is_running_ = true;
MoqtServer server_;
std::list<ChatServerSessionHandler> sessions_;
- MoqChatStrings strings_;
MoqtKnownTrackPublisher publisher_;
- std::shared_ptr<MoqtOutgoingQueue> catalog_;
RemoteTrackVisitor remote_track_visitor_;
- // indexed by username
- absl::flat_hash_map<std::string, std::shared_ptr<MoqtLiveRelayQueue>>
+ absl::flat_hash_map<FullTrackName, std::shared_ptr<MoqtLiveRelayQueue>>
user_queues_;
std::string output_filename_;
std::ofstream output_file_;
};
+} // namespace moq_chat
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_TOOLS_CHAT_SERVER_H_
diff --git a/quiche/quic/moqt/tools/chat_server_bin.cc b/quiche/quic/moqt/tools/chat_server_bin.cc
index 899c258..b905ce3 100644
--- a/quiche/quic/moqt/tools/chat_server_bin.cc
+++ b/quiche/quic/moqt/tools/chat_server_bin.cc
@@ -24,15 +24,16 @@
// A server for MoQT over chat, used for interop testing. See
// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
int main(int argc, char* argv[]) {
- const char* usage = "Usage: chat_server [options] <chat-id>";
+ const char* usage = "Usage: chat_server [options]";
std::vector<std::string> args =
quiche::QuicheParseCommandLineFlags(usage, argc, argv);
- if (args.size() != 1) {
+ if (!args.empty()) {
quiche::QuichePrintCommandLineFlagHelp(usage);
return 1;
}
- moqt::ChatServer server(quiche::CreateDefaultProofSource(), argv[1],
- quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
+ moqt::moq_chat::ChatServer server(
+ quiche::CreateDefaultProofSource(),
+ quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
quiche::QuicheIpAddress bind_address;
QUICHE_CHECK(bind_address.FromString(
quiche::GetQuicheCommandLineFlag(FLAGS_bind_address)));
diff --git a/quiche/quic/moqt/tools/moq_chat.cc b/quiche/quic/moqt/tools/moq_chat.cc
new file mode 100644
index 0000000..0ec5b64
--- /dev/null
+++ b/quiche/quic/moqt/tools/moq_chat.cc
@@ -0,0 +1,76 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/moq_chat.h"
+
+#include <optional>
+
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "absl/time/clock.h"
+#include "absl/time/time.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_logging.h"
+
+namespace moqt::moq_chat {
+
+bool IsValidPath(absl::string_view path) { return path == kWebtransPath; }
+
+bool IsValidNamespace(const FullTrackName& track_namespace) {
+ return track_namespace.tuple().size() == kFullPathLength - 1 &&
+ track_namespace.tuple()[0] == kBasePath;
+}
+
+bool IsValidChatNamespace(const FullTrackName& track_namespace) {
+ return track_namespace.tuple().size() == 2 &&
+ track_namespace.tuple()[0] == kBasePath;
+}
+
+FullTrackName ConstructTrackName(absl::string_view chat_id,
+ absl::string_view username,
+ absl::string_view device_id) {
+ return FullTrackName{kBasePath,
+ chat_id,
+ username,
+ device_id,
+ absl::StrCat(ToUnixSeconds(::absl::Now())),
+ kNameField};
+}
+
+std::optional<FullTrackName> ConstructTrackNameFromNamespace(
+ const FullTrackName& track_namespace, absl::string_view chat_id) {
+ if (track_namespace.tuple().size() != kFullPathLength - 1) {
+ return std::nullopt;
+ }
+ if (track_namespace.tuple()[0] != kBasePath ||
+ track_namespace.tuple()[1] != chat_id) {
+ return std::nullopt;
+ }
+ FullTrackName track_name = track_namespace;
+ track_name.AddElement(kNameField);
+ return track_name;
+}
+
+absl::string_view GetUsername(const FullTrackName& track_name) {
+ QUICHE_DCHECK(track_name.tuple().size() > 2);
+ return track_name.tuple()[2];
+}
+
+absl::string_view GetChatId(const FullTrackName& track_name) {
+ QUICHE_DCHECK(track_name.tuple().size() > 1);
+ return track_name.tuple()[1];
+}
+
+FullTrackName GetUserNamespace(const FullTrackName& track_name) {
+ QUICHE_DCHECK(track_name.tuple().size() == kFullPathLength);
+ FullTrackName track_namespace = track_name;
+ track_namespace.NameToNamespace();
+ return track_namespace;
+}
+
+FullTrackName GetChatNamespace(const FullTrackName& track_name) {
+ return FullTrackName{track_name.tuple()[0], track_name.tuple()[1]};
+}
+
+} // namespace moqt::moq_chat
diff --git a/quiche/quic/moqt/tools/moq_chat.h b/quiche/quic/moqt/tools/moq_chat.h
index 6789f82..02f23e4 100644
--- a/quiche/quic/moqt/tools/moq_chat.h
+++ b/quiche/quic/moqt/tools/moq_chat.h
@@ -5,64 +5,56 @@
#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQ_CHAT_H
#define QUICHE_QUIC_MOQT_TOOLS_MOQ_CHAT_H
-#include <string>
-#include <vector>
+#include <cstddef>
+#include <optional>
-#include "absl/strings/str_cat.h"
-#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
+// Utilities for manipulating moq-chat paths, names, and namespaces.
+
namespace moqt {
-// This class encodes all the syntax in moq-chat strings: paths, full track
-// names, and catalog entries.
-class MoqChatStrings {
- public:
- explicit MoqChatStrings(absl::string_view chat_id) : chat_id_(chat_id) {}
+namespace moq_chat {
- static constexpr absl::string_view kBasePath = "moq-chat";
- static constexpr absl::string_view kParticipantPath = "participant";
- static constexpr absl::string_view kCatalogPath = "catalog";
- static constexpr absl::string_view kCatalogHeader = "version=1\n";
+constexpr absl::string_view kWebtransPath = "/moq-relay";
+// The order of fields is "moq-chat", chat-id, username, device-id, timestamp,
+// "chat".
+// The number of tiers in a full track name.
+constexpr size_t kFullPathLength = 6;
+// The first element in the track namespace or name.
+constexpr absl::string_view kBasePath = "moq-chat";
+// The last element in the track name.
+constexpr absl::string_view kNameField = "chat";
- // Verifies that the WebTransport path matches the spec.
- bool IsValidPath(absl::string_view path) const {
- return path == absl::StrCat("/", kBasePath);
- }
+// Verifies that the WebTransport path matches the spec.
+bool IsValidPath(absl::string_view path);
- // Returns "" if the track namespace is not a participant track.
- std::string GetUsernameFromFullTrackName(
- FullTrackName full_track_name) const {
- if (full_track_name.tuple().size() != 2) {
- return "";
- }
- if (!full_track_name.tuple()[1].empty()) {
- return "";
- }
- std::vector<absl::string_view> elements =
- absl::StrSplit(full_track_name.tuple()[0], '/');
- if (elements.size() != 4 || elements[0] != kBasePath ||
- elements[1] != chat_id_ || elements[2] != kParticipantPath) {
- return "";
- }
- return std::string(elements[3]);
- }
+bool IsValidTrackNamespace(const FullTrackName& track_namespace);
+bool IsValidChatNamespace(const FullTrackName& track_namespace);
- FullTrackName GetFullTrackNameFromUsername(absl::string_view username) const {
- return FullTrackName{absl::StrCat(kBasePath, "/", chat_id_, "/",
- kParticipantPath, "/", username),
- ""};
- }
+// Given a chat-id and username, returns a full track name for moq-chat.
+FullTrackName ConstructTrackName(absl::string_view chat_id,
+ absl::string_view username,
+ absl::string_view device_id);
- FullTrackName GetCatalogName() const {
- return FullTrackName{absl::StrCat(kBasePath, "/", chat_id_),
- absl::StrCat("/", kCatalogPath)};
- }
+// constructs a full track name based on the track_namespace. If the namespace
+// is syntactically incorrect, or does not match the expected value of
+// |chat-id|, returns nullopt
+std::optional<FullTrackName> ConstructTrackNameFromNamespace(
+ const FullTrackName& track_namespace, absl::string_view chat_id);
- private:
- const std::string chat_id_;
-};
+// Strips "chat" from the end of |track_name| to use in ANNOUNCE.
+FullTrackName GetUserNamespace(const FullTrackName& track_name);
+
+// Returns {"moq-chat", chat-id}, useful for SUBSCRIBE_ANNOUNCES.
+FullTrackName GetChatNamespace(const FullTrackName& track_name);
+
+absl::string_view GetUsername(const FullTrackName& track_name);
+
+absl::string_view GetChatId(const FullTrackName& track_name);
+
+} // namespace moq_chat
} // namespace moqt
diff --git a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
index 3460aea..8807363 100644
--- a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
+++ b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
@@ -12,6 +12,7 @@
#include "quiche/quic/core/quic_server_id.h"
#include "quiche/quic/moqt/tools/chat_client.h"
#include "quiche/quic/moqt/tools/chat_server.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
#include "quiche/quic/test_tools/crypto_test_utils.h"
#include "quiche/common/platform/api/quiche_test.h"
@@ -26,7 +27,7 @@
constexpr absl::string_view kChatHostname = "127.0.0.1";
-class MockChatUserInterface : public ChatUserInterface {
+class MockChatUserInterface : public moqt::moq_chat::ChatUserInterface {
public:
void Initialize(quiche::MultiUseCallback<void(absl::string_view)> callback,
quic::QuicEventLoop* event_loop) override {
@@ -35,7 +36,7 @@
}
void IoLoop() override {
- event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ event_loop_->RunEventLoopOnce(moqt::moq_chat::kChatEventLoopDuration);
}
MOCK_METHOD(void, WriteToOutput,
@@ -52,8 +53,7 @@
class MoqChatEndToEndTest : public quiche::test::QuicheTest {
public:
MoqChatEndToEndTest()
- : server_(quic::test::crypto_test_utils::ProofSourceForTesting(),
- "test_chat", "") {
+ : server_(quic::test::crypto_test_utils::ProofSourceForTesting(), "") {
quiche::QuicheIpAddress bind_address;
std::string hostname(kChatHostname);
bind_address.FromString(hostname);
@@ -64,11 +64,13 @@
interface1_ = if1ptr.get();
interface2_ = if2ptr.get();
uint16_t port = server_.moqt_server().quic_server().port();
- client1_ = std::make_unique<ChatClient>(
+ client1_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(hostname, port), true, std::move(if1ptr),
+ "test_chat", "client1", "device1",
server_.moqt_server().quic_server().event_loop());
- client2_ = std::make_unique<ChatClient>(
+ client2_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(hostname, port), true, std::move(if2ptr),
+ "test_chat", "client2", "device2",
server_.moqt_server().quic_server().event_loop());
}
@@ -86,16 +88,16 @@
}
}
- ChatServer server_;
+ moqt::moq_chat::ChatServer server_;
MockChatUserInterface *interface1_, *interface2_;
- std::unique_ptr<ChatClient> client1_, client2_;
+ std::unique_ptr<moqt::moq_chat::ChatClient> client1_, client2_;
};
TEST_F(MoqChatEndToEndTest, EndToEndTest) {
- EXPECT_TRUE(client1_->Connect("/moq-chat", "client1", "test_chat"));
- EXPECT_TRUE(client2_->Connect("/moq-chat", "client2", "test_chat"));
- EXPECT_TRUE(client1_->AnnounceAndSubscribe());
- EXPECT_TRUE(client2_->AnnounceAndSubscribe());
+ EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
+ EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath));
+ EXPECT_TRUE(client1_->AnnounceAndSubscribeAnnounces());
+ EXPECT_TRUE(client2_->AnnounceAndSubscribeAnnounces());
SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi");
SendAndWaitForOutput(interface1_, interface2_, "client1", "How are you?");
@@ -109,10 +111,10 @@
}
TEST_F(MoqChatEndToEndTest, LeaveAndRejoin) {
- EXPECT_TRUE(client1_->Connect("/moq-chat", "client1", "test_chat"));
- EXPECT_TRUE(client2_->Connect("/moq-chat", "client2", "test_chat"));
- EXPECT_TRUE(client1_->AnnounceAndSubscribe());
- EXPECT_TRUE(client2_->AnnounceAndSubscribe());
+ EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
+ EXPECT_TRUE(client2_->Connect(moqt::moq_chat::kWebtransPath));
+ EXPECT_TRUE(client1_->AnnounceAndSubscribeAnnounces());
+ EXPECT_TRUE(client2_->AnnounceAndSubscribeAnnounces());
SendAndWaitForOutput(interface1_, interface2_, "client1", "Hello");
SendAndWaitForOutput(interface2_, interface1_, "client2", "Hi");
@@ -129,11 +131,12 @@
auto if1bptr = std::make_unique<MockChatUserInterface>();
MockChatUserInterface* interface1b_ = if1bptr.get();
uint16_t port = server_.moqt_server().quic_server().port();
- client1_ = std::make_unique<ChatClient>(
+ client1_ = std::make_unique<moqt::moq_chat::ChatClient>(
quic::QuicServerId(std::string(kChatHostname), port), true,
- std::move(if1bptr), server_.moqt_server().quic_server().event_loop());
- EXPECT_TRUE(client1_->Connect("/moq-chat", "client1", "test_chat"));
- EXPECT_TRUE(client1_->AnnounceAndSubscribe());
+ std::move(if1bptr), "test_chat", "client1", "device1",
+ server_.moqt_server().quic_server().event_loop());
+ EXPECT_TRUE(client1_->Connect(moqt::moq_chat::kWebtransPath));
+ EXPECT_TRUE(client1_->AnnounceAndSubscribeAnnounces());
SendAndWaitForOutput(interface1b_, interface2_, "client1", "Hello again");
SendAndWaitForOutput(interface2_, interface1b_, "client2", "Hi again");
}
diff --git a/quiche/quic/moqt/tools/moq_chat_test.cc b/quiche/quic/moqt/tools/moq_chat_test.cc
index f2c8d09..d402054 100644
--- a/quiche/quic/moqt/tools/moq_chat_test.cc
+++ b/quiche/quic/moqt/tools/moq_chat_test.cc
@@ -7,70 +7,63 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/common/platform/api/quiche_test.h"
-namespace moqt {
+namespace moqt::moq_chat {
namespace {
-class MoqChatStringsTest : public quiche::test::QuicheTest {
- public:
- MoqChatStrings strings_{"chat-id"};
-};
+class MoqChatTest : public quiche::test::QuicheTest {};
-TEST_F(MoqChatStringsTest, IsValidPath) {
- EXPECT_TRUE(strings_.IsValidPath("/moq-chat"));
- EXPECT_FALSE(strings_.IsValidPath("moq-chat"));
- EXPECT_FALSE(strings_.IsValidPath("/moq-cha"));
- EXPECT_FALSE(strings_.IsValidPath("/moq-chats"));
- EXPECT_FALSE(strings_.IsValidPath("/moq-chat/"));
+TEST_F(MoqChatTest, IsValidPath) {
+ EXPECT_TRUE(IsValidPath("/moq-relay"));
+ EXPECT_FALSE(IsValidPath("moq-relay"));
+ EXPECT_FALSE(IsValidPath("/moq-rela"));
+ EXPECT_FALSE(IsValidPath("/moq-relays"));
+ EXPECT_FALSE(IsValidPath("/moq-relay/"));
}
-TEST_F(MoqChatStringsTest, GetUsernameFromFullTrackName) {
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participant/user", ""}),
- "user");
+TEST_F(MoqChatTest, ConstructNameForUser) {
+ FullTrackName name = ConstructTrackName("chat-id", "user", "device");
+
+ EXPECT_EQ(GetChatId(name), "chat-id");
+ EXPECT_EQ(GetUsername(name), "user");
+ // Check that the namespace passes validation.
+ name.NameToNamespace();
+ EXPECT_TRUE(ConstructTrackNameFromNamespace(name, "chat-id").has_value());
}
-TEST_F(MoqChatStringsTest, GetUsernameFromFullTrackNameInvalidInput) {
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"/moq-chat/chat-id/participant/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participant/user/", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-cha/chat-id/participant/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-i/participant/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participan/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participant/foo/user", ""}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participant/user", "foo"}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"moq-chat/chat-id/participant/user"}),
- "");
- EXPECT_EQ(strings_.GetUsernameFromFullTrackName(
- FullTrackName{"foo", "moq-chat/chat-id/participant/user", ""}),
- "");
+TEST_F(MoqChatTest, InvalidNamespace) {
+ FullTrackName track_namespace{kBasePath, "chat-id", "username", "device",
+ "timestamp"};
+ // Wrong chat ID.
+ EXPECT_FALSE(
+ ConstructTrackNameFromNamespace(track_namespace, "chat-id2").has_value());
+ // Namespace includes name
+ track_namespace.AddElement("chat");
+ EXPECT_FALSE(
+ ConstructTrackNameFromNamespace(track_namespace, "chat-id").has_value());
+ track_namespace.NameToNamespace(); // Restore to correct value.
+ // Namespace too short.
+ track_namespace.NameToNamespace();
+ EXPECT_FALSE(
+ ConstructTrackNameFromNamespace(track_namespace, "chat-id").has_value());
+ track_namespace.AddElement("chat"); // Restore to correct value.
+ // Base Path is wrong.
+ FullTrackName bad_base_path{"moq-chat2", "chat-id", "user", "device",
+ "timestamp"};
+ EXPECT_FALSE(
+ ConstructTrackNameFromNamespace(bad_base_path, "chat-id").has_value());
}
-TEST_F(MoqChatStringsTest, GetFullTrackNameFromUsername) {
- EXPECT_EQ(strings_.GetFullTrackNameFromUsername("user"),
- FullTrackName("moq-chat/chat-id/participant/user", ""));
-}
-
-TEST_F(MoqChatStringsTest, GetCatalogName) {
- EXPECT_EQ(strings_.GetCatalogName(),
- FullTrackName("moq-chat/chat-id", "/catalog"));
+TEST_F(MoqChatTest, Queries) {
+ FullTrackName local_name{kBasePath, "chat-id", "user",
+ "device", "timestamp", kNameField};
+ EXPECT_EQ(GetChatId(local_name), "chat-id");
+ EXPECT_EQ(GetUsername(local_name), "user");
+ FullTrackName track_namespace{"moq-chat", "chat-id", "user", "device",
+ "timestamp"};
+ EXPECT_EQ(GetUserNamespace(local_name), track_namespace);
+ FullTrackName chat_namespace{"moq-chat", "chat-id"};
+ EXPECT_EQ(GetChatNamespace(local_name), chat_namespace);
}
} // namespace
-} // namespace moqt
+} // namespace moqt::moq_chat