Refactor MoQ ChatClient to support end-to-end tests and use moq_chat.h.
PiperOrigin-RevId: 660859814
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 052b6ae..66d3aa4 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1521,6 +1521,7 @@
"quic/moqt/moqt_track.h",
"quic/moqt/test_tools/moqt_simulator_harness.h",
"quic/moqt/test_tools/moqt_test_message.h",
+ "quic/moqt/tools/chat_client.h",
"quic/moqt/tools/chat_server.h",
"quic/moqt/tools/moq_chat.h",
"quic/moqt/tools/moqt_client.h",
@@ -1549,6 +1550,7 @@
"quic/moqt/moqt_track.cc",
"quic/moqt/moqt_track_test.cc",
"quic/moqt/test_tools/moqt_simulator_harness.cc",
+ "quic/moqt/tools/chat_client.cc",
"quic/moqt/tools/chat_client_bin.cc",
"quic/moqt/tools/chat_server.cc",
"quic/moqt/tools/chat_server_bin.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 04e3a31..ef670d5 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1525,6 +1525,7 @@
"src/quiche/quic/moqt/moqt_track.h",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"src/quiche/quic/moqt/test_tools/moqt_test_message.h",
+ "src/quiche/quic/moqt/tools/chat_client.h",
"src/quiche/quic/moqt/tools/chat_server.h",
"src/quiche/quic/moqt/tools/moq_chat.h",
"src/quiche/quic/moqt/tools/moqt_client.h",
@@ -1553,6 +1554,7 @@
"src/quiche/quic/moqt/moqt_track.cc",
"src/quiche/quic/moqt/moqt_track_test.cc",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
+ "src/quiche/quic/moqt/tools/chat_client.cc",
"src/quiche/quic/moqt/tools/chat_client_bin.cc",
"src/quiche/quic/moqt/tools/chat_server.cc",
"src/quiche/quic/moqt/tools/chat_server_bin.cc",
diff --git a/build/source_list.json b/build/source_list.json
index a24b008..6feebd1 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1524,6 +1524,7 @@
"quiche/quic/moqt/moqt_track.h",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.h",
"quiche/quic/moqt/test_tools/moqt_test_message.h",
+ "quiche/quic/moqt/tools/chat_client.h",
"quiche/quic/moqt/tools/chat_server.h",
"quiche/quic/moqt/tools/moq_chat.h",
"quiche/quic/moqt/tools/moqt_client.h",
@@ -1552,6 +1553,7 @@
"quiche/quic/moqt/moqt_track.cc",
"quiche/quic/moqt/moqt_track_test.cc",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
+ "quiche/quic/moqt/tools/chat_client.cc",
"quiche/quic/moqt/tools/chat_client_bin.cc",
"quiche/quic/moqt/tools/chat_server.cc",
"quiche/quic/moqt/tools/chat_server_bin.cc",
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 41a6270..2dda0db 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -8,6 +8,7 @@
#include <cstddef>
#include <cstdint>
#include <optional>
+#include <tuple>
#include <utility>
#include <vector>
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
new file mode 100644
index 0000000..760104b
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -0,0 +1,277 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/tools/chat_client.h"
+
+#include <poll.h>
+#include <unistd.h>
+
+#include <cstdint>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/functional/bind_front.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/str_split.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_verifier.h"
+#include "quiche/quic/core/io/quic_default_event_loop.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_default_clock.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
+#include "quiche/quic/moqt/tools/moqt_client.h"
+#include "quiche/quic/platform/api/quic_default_proof_providers.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/tools/fake_proof_verifier.h"
+#include "quiche/quic/tools/interactive_cli.h"
+#include "quiche/quic/tools/quic_name_lookup.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
+
+moqt::ChatClient::ChatClient(const quic::QuicServerId& server_id,
+ absl::string_view path, absl::string_view username,
+ absl::string_view chat_id, bool ignore_certificate,
+ absl::string_view output_file)
+ : username_(username), chat_strings_(chat_id) {
+ quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
+ std::cout << "Connecting to host " << server_id.host() << " port "
+ << server_id.port() << " path " << path << "\n";
+ event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+ quic::QuicSocketAddress peer_address =
+ quic::tools::LookupAddress(AF_UNSPEC, server_id);
+ std::unique_ptr<quic::ProofVerifier> verifier;
+ output_filename_ = output_file;
+ if (!output_filename_.empty()) {
+ output_file_.open(output_filename_);
+ output_file_ << "Chat transcript:\n";
+ output_file_.flush();
+ }
+ if (ignore_certificate) {
+ verifier = std::make_unique<quic::FakeProofVerifier>();
+ } else {
+ verifier = quic::CreateDefaultProofVerifier(server_id.host());
+ }
+ client_ = std::make_unique<moqt::MoqtClient>(
+ peer_address, server_id, std::move(verifier), event_loop_.get());
+ session_callbacks_.session_established_callback = [this]() {
+ std::cout << "Session established\n";
+ session_is_open_ = true;
+ if (output_filename_.empty()) { // Use the CLI.
+ cli_ = std::make_unique<quic::InteractiveCli>(
+ event_loop_.get(),
+ absl::bind_front(&ChatClient::OnTerminalLineInput, this));
+ cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
+ }
+ };
+ session_callbacks_.session_terminated_callback =
+ [this](absl::string_view error_message) {
+ std::cerr << "Closed session, reason = " << error_message << "\n";
+ session_is_open_ = false;
+ };
+ session_callbacks_.session_deleted_callback = [this]() {
+ session_ = nullptr;
+ };
+ client_->Connect(std::string(path), std::move(session_callbacks_));
+}
+
+void moqt::ChatClient::OnTerminalLineInput(absl::string_view input_message) {
+ if (input_message.empty()) {
+ return;
+ }
+ if (input_message == "/exit") {
+ session_is_open_ = false;
+ return;
+ }
+ quiche::QuicheMemSlice message_slice(quiche::QuicheBuffer::Copy(
+ quiche::SimpleBufferAllocator::Get(), input_message));
+ queue_->AddObject(std::move(message_slice), /*key=*/true);
+}
+
+void moqt::ChatClient::RemoteTrackVisitor::OnReply(
+ const moqt::FullTrackName& full_track_name,
+ std::optional<absl::string_view> reason_phrase) {
+ client_->subscribes_to_make_--;
+ if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+ std::cout << "Subscription to catalog ";
+ } else {
+ std::cout << "Subscription to user " << full_track_name.track_namespace
+ << " ";
+ }
+ if (reason_phrase.has_value()) {
+ std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
+ } else {
+ std::cout << "ACCEPTED\n";
+ }
+}
+
+void moqt::ChatClient::RemoteTrackVisitor::OnObjectFragment(
+ const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
+ uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
+ moqt::MoqtObjectStatus /*status*/,
+ moqt::MoqtForwardingPreference /*forwarding_preference*/,
+ absl::string_view object, bool end_of_message) {
+ if (!end_of_message) {
+ std::cerr << "Error: received partial message despite requesting "
+ "buffering\n";
+ }
+ if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+ if (group_sequence < client_->catalog_group_) {
+ std::cout << "Ignoring old catalog";
+ return;
+ }
+ client_->ProcessCatalog(object, this, group_sequence, object_sequence);
+ return;
+ }
+ std::string username = full_track_name.track_namespace;
+ username = username.substr(username.find_last_of('/') + 1);
+ if (!client_->other_users_.contains(username)) {
+ std::cout << "Username " << username << "doesn't exist\n";
+ return;
+ }
+ if (client_->has_output_file()) {
+ client_->WriteToFile(username, object);
+ return;
+ }
+ if (cli_ != nullptr) {
+ std::string full_output = absl::StrCat(username, ": ", object);
+ cli_->PrintLine(full_output);
+ }
+}
+
+bool moqt::ChatClient::AnnounceAndSubscribe() {
+ session_ = client_->session();
+ if (session_ == nullptr) {
+ std::cout << "Failed to connect.\n";
+ return false;
+ }
+ FullTrackName my_track_name =
+ chat_strings_.GetFullTrackNameFromUsername(username_);
+ queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
+ my_track_name, moqt::MoqtForwardingPreference::kObject);
+ publisher_.Add(queue_);
+ session_->set_publisher(&publisher_);
+ moqt::MoqtOutgoingAnnounceCallback announce_callback =
+ [this](absl::string_view track_namespace,
+ std::optional<moqt::MoqtAnnounceErrorReason> reason) {
+ if (reason.has_value()) {
+ std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
+ session_->Error(moqt::MoqtError::kInternalError,
+ "Local ANNOUNCE rejected");
+ return;
+ }
+ std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
+ return;
+ };
+ std::cout << "Announcing " << my_track_name.track_namespace << "\n";
+ session_->Announce(my_track_name.track_namespace,
+ std::move(announce_callback));
+ remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
+ FullTrackName catalog_name = chat_strings_.GetCatalogName();
+ if (!session_->SubscribeCurrentGroup(
+ catalog_name.track_namespace, catalog_name.track_name,
+ remote_track_visitor_.get(), username_)) {
+ std::cout << "Failed to get catalog\n";
+ return false;
+ }
+ return true;
+}
+
+void moqt::ChatClient::ProcessCatalog(absl::string_view object,
+ moqt::RemoteTrack::Visitor* visitor,
+ uint64_t group_sequence,
+ uint64_t object_sequence) {
+ std::string message(object);
+ std::istringstream f(message);
+ // std::string line;
+ bool got_version = true;
+ if (object_sequence == 0) {
+ std::cout << "Received new Catalog. Users:\n";
+ got_version = false;
+ }
+ std::vector<absl::string_view> lines =
+ absl::StrSplit(object, '\n', absl::SkipEmpty());
+ for (absl::string_view line : lines) {
+ if (!got_version) {
+ if (line != "version=1") {
+ session_->Error(moqt::MoqtError::kProtocolViolation,
+ "Catalog does not begin with version");
+ return;
+ }
+ got_version = true;
+ continue;
+ }
+ std::string user;
+ bool add = true;
+ if (object_sequence > 0) {
+ switch (line[0]) {
+ case '-':
+ add = false;
+ break;
+ case '+':
+ break;
+ default:
+ std::cerr << "Catalog update with neither + nor -\n";
+ return;
+ }
+ user = line.substr(1, line.size() - 1);
+ } else {
+ user = line;
+ }
+ if (username_ == user) {
+ std::cout << user << "\n";
+ continue;
+ }
+ if (!add) {
+ // TODO: Unsubscribe from the user that's leaving
+ std::cout << user << "left the chat\n";
+ other_users_.erase(user);
+ continue;
+ }
+ if (object_sequence == 0) {
+ std::cout << user << "\n";
+ } else {
+ std::cout << user << " joined the chat\n";
+ }
+ auto it = other_users_.find(user);
+ if (it == other_users_.end()) {
+ moqt::FullTrackName to_subscribe =
+ chat_strings_.GetFullTrackNameFromUsername(user);
+ auto new_user = other_users_.emplace(
+ std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
+ ChatUser& user_record = new_user.first->second;
+ session_->SubscribeCurrentGroup(
+ user_record.full_track_name.track_namespace,
+ user_record.full_track_name.track_name, visitor);
+ subscribes_to_make_++;
+ } else {
+ if (it->second.from_group == group_sequence) {
+ session_->Error(moqt::MoqtError::kProtocolViolation,
+ "User listed twice in Catalog");
+ return;
+ }
+ it->second.from_group = group_sequence;
+ }
+ }
+ if (object_sequence == 0) { // Eliminate users that are no longer present
+ absl::erase_if(other_users_, [&](const auto& kv) {
+ return kv.second.from_group != group_sequence;
+ });
+ }
+ catalog_group_ = group_sequence;
+}
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
new file mode 100644
index 0000000..b5cb747
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -0,0 +1,137 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
+#define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
+
+#include <cstdint>
+#include <fstream>
+#include <memory>
+#include <optional>
+#include <string>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moq_chat.h"
+#include "quiche/quic/moqt/tools/moqt_client.h"
+#include "quiche/quic/tools/interactive_cli.h"
+#include "quiche/common/platform/api/quiche_export.h"
+
+namespace moqt {
+
+class ChatClient {
+ public:
+ ChatClient(const quic::QuicServerId& server_id, absl::string_view path,
+ absl::string_view username, absl::string_view chat_id,
+ bool ignore_certificate, absl::string_view output_file);
+
+ void OnTerminalLineInput(absl::string_view input_message);
+
+ bool session_is_open() const { return session_is_open_; }
+
+ // Returns true if the client is still doing initial sync: retrieving the
+ // catalog, subscribing to all the users in it, and waiting for the server
+ // to subscribe to the local track.
+ bool is_syncing() const {
+ return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+ (queue_ == nullptr || !queue_->HasSubscribers());
+ }
+
+ void RunEventLoop() {
+ event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500));
+ }
+
+ bool has_output_file() { return !output_filename_.empty(); }
+
+ void WriteToFile(absl::string_view user, absl::string_view message) {
+ output_file_ << user << ": " << message << "\n\n";
+ output_file_.flush();
+ }
+
+ class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
+ public:
+ RemoteTrackVisitor(ChatClient* client) : client_(client) {
+ cli_ = client->cli_.get();
+ }
+
+ void OnReply(const moqt::FullTrackName& full_track_name,
+ std::optional<absl::string_view> reason_phrase) override;
+
+ void OnObjectFragment(const moqt::FullTrackName& full_track_name,
+ uint64_t group_sequence, uint64_t object_sequence,
+ moqt::MoqtPriority publisher_priority,
+ moqt::MoqtObjectStatus status,
+ moqt::MoqtForwardingPreference forwarding_preference,
+ absl::string_view object,
+ bool end_of_message) override;
+
+ void set_cli(quic::InteractiveCli* cli) { cli_ = cli; }
+
+ private:
+ ChatClient* client_;
+ quic::InteractiveCli* cli_;
+ };
+
+ // Returns false on error.
+ bool AnnounceAndSubscribe();
+
+ private:
+ // Objects from the same catalog group arrive on the same stream, and in
+ // object sequence order.
+ void ProcessCatalog(absl::string_view object,
+ moqt::RemoteTrack::Visitor* visitor,
+ uint64_t group_sequence, uint64_t object_sequence);
+
+ struct ChatUser {
+ moqt::FullTrackName full_track_name;
+ uint64_t from_group;
+ ChatUser(const moqt::FullTrackName& ftn, uint64_t group)
+ : full_track_name(ftn), from_group(group) {}
+ };
+
+ // Basic session information
+ const std::string username_;
+ moqt::MoqChatStrings chat_strings_;
+
+ // General state variables
+ std::unique_ptr<quic::QuicEventLoop> event_loop_;
+ bool session_is_open_ = false;
+ moqt::MoqtSession* session_ = nullptr;
+ moqt::MoqtKnownTrackPublisher publisher_;
+ std::unique_ptr<moqt::MoqtClient> client_;
+ moqt::MoqtSessionCallbacks session_callbacks_;
+
+ // Related to syncing.
+ std::optional<uint64_t> catalog_group_;
+ absl::flat_hash_map<std::string, ChatUser> other_users_;
+ int subscribes_to_make_ = 1;
+
+ // Related to subscriptions/announces
+ // TODO: One for each subscribe
+ std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
+
+ // Handling outgoing messages
+ std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
+
+ // Used when chat output goes to file.
+ std::ofstream output_file_;
+ std::string output_filename_;
+
+ // Used when there is no output file, and both input and output are in the
+ // terminal.
+ std::unique_ptr<quic::InteractiveCli> cli_;
+};
+
+} // namespace moqt
+
+#endif // QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 41c0b20..867db4e 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -5,44 +5,16 @@
#include <poll.h>
#include <unistd.h>
-#include <cstdint>
-#include <fstream>
#include <iostream>
-#include <memory>
-#include <optional>
-#include <sstream>
#include <string>
-#include <utility>
#include <vector>
-#include "absl/container/flat_hash_map.h"
-#include "absl/functional/bind_front.h"
-#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
-#include "quiche/quic/core/crypto/proof_verifier.h"
-#include "quiche/quic/core/io/quic_default_event_loop.h"
-#include "quiche/quic/core/io/quic_event_loop.h"
-#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_server_id.h"
-#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_known_track_publisher.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_outgoing_queue.h"
-#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/moqt/tools/moqt_client.h"
-#include "quiche/quic/platform/api/quic_default_proof_providers.h"
-#include "quiche/quic/platform/api/quic_socket_address.h"
-#include "quiche/quic/tools/fake_proof_verifier.h"
-#include "quiche/quic/tools/interactive_cli.h"
+#include "quiche/quic/moqt/tools/chat_client.h"
#include "quiche/quic/tools/quic_name_lookup.h"
#include "quiche/quic/tools/quic_url.h"
#include "quiche/common/platform/api/quiche_command_line_flags.h"
-#include "quiche/common/platform/api/quiche_export.h"
-#include "quiche/common/platform/api/quiche_mem_slice.h"
-#include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/simple_buffer_allocator.h"
DEFINE_QUICHE_COMMAND_LINE_FLAG(
bool, disable_certificate_verification, false,
@@ -52,319 +24,6 @@
std::string, output_file, "",
"chat messages will stream to a file instead of stdout");
-class ChatClient {
- public:
- ChatClient(quic::QuicServerId& server_id, std::string path,
- std::string username, std::string chat_id)
- : chat_id_(chat_id),
- username_(username),
- my_track_name_(UsernameToTrackName(username)),
- catalog_name_("moq-chat/" + chat_id, "/catalog") {
- quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
- std::cout << "Connecting to host " << server_id.host() << " port "
- << server_id.port() << " path " << path << "\n";
- event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
- quic::QuicSocketAddress peer_address =
- quic::tools::LookupAddress(AF_UNSPEC, server_id);
- std::unique_ptr<quic::ProofVerifier> verifier;
- const bool ignore_certificate = quiche::GetQuicheCommandLineFlag(
- FLAGS_disable_certificate_verification);
- output_filename_ = quiche::GetQuicheCommandLineFlag(FLAGS_output_file);
- if (!output_filename_.empty()) {
- output_file_.open(output_filename_);
- output_file_ << "Chat transcript:\n";
- output_file_.flush();
- }
- if (ignore_certificate) {
- verifier = std::make_unique<quic::FakeProofVerifier>();
- } else {
- verifier = quic::CreateDefaultProofVerifier(server_id.host());
- }
- client_ = std::make_unique<moqt::MoqtClient>(
- peer_address, server_id, std::move(verifier), event_loop_.get());
- session_callbacks_.session_established_callback = [this]() {
- std::cout << "Session established\n";
- session_is_open_ = true;
- if (output_filename_.empty()) { // Use the CLI.
- cli_ = std::make_unique<quic::InteractiveCli>(
- event_loop_.get(),
- absl::bind_front(&ChatClient::OnTerminalLineInput, this));
- cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
- }
- };
- session_callbacks_.session_terminated_callback =
- [this](absl::string_view error_message) {
- std::cerr << "Closed session, reason = " << error_message << "\n";
- session_is_open_ = false;
- };
- session_callbacks_.session_deleted_callback = [this]() {
- session_ = nullptr;
- };
- client_->Connect(path, std::move(session_callbacks_));
- }
-
- void OnTerminalLineInput(absl::string_view input_message) {
- if (input_message.empty()) {
- return;
- }
- if (input_message == "/exit") {
- session_is_open_ = false;
- return;
- }
- quiche::QuicheMemSlice message_slice(quiche::QuicheBuffer::Copy(
- quiche::SimpleBufferAllocator::Get(), input_message));
- queue_->AddObject(std::move(message_slice), /*key=*/true);
- }
-
- bool session_is_open() const { return session_is_open_; }
- bool is_syncing() const {
- return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
- (queue_ == nullptr || !queue_->HasSubscribers());
- }
-
- void RunEventLoop() {
- event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500));
- }
-
- bool has_output_file() { return !output_filename_.empty(); }
-
- void WriteToFile(absl::string_view user, absl::string_view message) {
- output_file_ << user << ": " << message << "\n\n";
- output_file_.flush();
- }
-
- class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
- public:
- RemoteTrackVisitor(ChatClient* client) : client_(client) {
- cli_ = client->cli_.get();
- }
-
- void OnReply(const moqt::FullTrackName& full_track_name,
- std::optional<absl::string_view> reason_phrase) override {
- client_->subscribes_to_make_--;
- if (full_track_name == client_->catalog_name_) {
- std::cout << "Subscription to catalog ";
- } else {
- std::cout << "Subscription to user " << full_track_name.track_namespace
- << " ";
- }
- if (reason_phrase.has_value()) {
- std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
- } else {
- std::cout << "ACCEPTED\n";
- }
- }
-
- void OnObjectFragment(
- const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
- uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
- moqt::MoqtObjectStatus /*status*/,
- moqt::MoqtForwardingPreference /*forwarding_preference*/,
- absl::string_view object, bool end_of_message) override {
- if (!end_of_message) {
- std::cerr << "Error: received partial message despite requesting "
- "buffering\n";
- }
- if (full_track_name == client_->catalog_name_) {
- if (group_sequence < client_->catalog_group_) {
- std::cout << "Ignoring old catalog";
- return;
- }
- client_->ProcessCatalog(object, this, group_sequence, object_sequence);
- return;
- }
- std::string username = full_track_name.track_namespace;
- username = username.substr(username.find_last_of('/') + 1);
- if (!client_->other_users_.contains(username)) {
- std::cout << "Username " << username << "doesn't exist\n";
- return;
- }
- if (client_->has_output_file()) {
- client_->WriteToFile(username, object);
- return;
- }
- if (cli_ != nullptr) {
- std::string full_output = absl::StrCat(username, ": ", object);
- cli_->PrintLine(full_output);
- }
- }
-
- void set_cli(quic::InteractiveCli* cli) { cli_ = cli; }
-
- private:
- ChatClient* client_;
- quic::InteractiveCli* cli_;
- };
-
- // returns false on error
- bool AnnounceAndSubscribe() {
- session_ = client_->session();
- if (session_ == nullptr) {
- std::cout << "Failed to connect.\n";
- return false;
- }
- queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
- my_track_name_, moqt::MoqtForwardingPreference::kObject);
- publisher_.Add(queue_);
- session_->set_publisher(&publisher_);
- moqt::MoqtOutgoingAnnounceCallback announce_callback =
- [&](absl::string_view track_namespace,
- std::optional<moqt::MoqtAnnounceErrorReason> reason) {
- if (reason.has_value()) {
- std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
- session_->Error(moqt::MoqtError::kInternalError,
- "Local ANNOUNCE rejected");
- return;
- }
- std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
- return;
- };
- std::cout << "Announcing " << my_track_name_.track_namespace << "\n";
- session_->Announce(my_track_name_.track_namespace,
- std::move(announce_callback));
- remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
- if (!session_->SubscribeCurrentGroup(
- catalog_name_.track_namespace, catalog_name_.track_name,
- remote_track_visitor_.get(), username_)) {
- std::cout << "Failed to get catalog for " << chat_id_ << "\n";
- return false;
- }
- return true;
- }
-
- private:
- moqt::FullTrackName UsernameToTrackName(absl::string_view username) {
- return moqt::FullTrackName(
- absl::StrCat("moq-chat/", chat_id_, "/participant/", username), "");
- }
-
- // Objects from the same catalog group arrive on the same stream, and in
- // object sequence order.
- void ProcessCatalog(absl::string_view object,
- moqt::RemoteTrack::Visitor* visitor,
- uint64_t group_sequence, uint64_t object_sequence) {
- std::string message(object);
- std::istringstream f(message);
- std::string line;
- bool got_version = true;
- if (object_sequence == 0) {
- std::cout << "Received new Catalog. Users:\n";
- got_version = false;
- }
- while (std::getline(f, line)) {
- if (!got_version) {
- if (line != "version=1") {
- session_->Error(moqt::MoqtError::kProtocolViolation,
- "Catalog does not begin with version");
- return;
- }
- got_version = true;
- continue;
- }
- if (line.empty()) {
- continue;
- }
- std::string user;
- bool add = true;
- if (object_sequence > 0) {
- switch (line[0]) {
- case '-':
- add = false;
- break;
- case '+':
- break;
- default:
- std::cerr << "Catalog update with neither + nor -\n";
- return;
- }
- user = line.substr(1, line.size() - 1);
- } else {
- user = line;
- }
- if (username_ == user) {
- std::cout << user << "\n";
- continue;
- }
- if (!add) {
- // TODO: Unsubscribe from the user that's leaving
- std::cout << user << "left the chat\n";
- other_users_.erase(user);
- continue;
- }
- if (object_sequence == 0) {
- std::cout << user << "\n";
- } else {
- std::cout << user << " joined the chat\n";
- }
- auto it = other_users_.find(user);
- if (it == other_users_.end()) {
- moqt::FullTrackName to_subscribe = UsernameToTrackName(user);
- auto new_user = other_users_.emplace(
- std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
- ChatUser& user_record = new_user.first->second;
- session_->SubscribeCurrentGroup(
- user_record.full_track_name.track_namespace,
- user_record.full_track_name.track_name, visitor);
- subscribes_to_make_++;
- } else {
- if (it->second.from_group == group_sequence) {
- session_->Error(moqt::MoqtError::kProtocolViolation,
- "User listed twice in Catalog");
- return;
- }
- it->second.from_group = group_sequence;
- }
- }
- if (object_sequence == 0) { // Eliminate users that are no longer present
- absl::erase_if(other_users_, [&](const auto& kv) {
- return kv.second.from_group != group_sequence;
- });
- }
- catalog_group_ = group_sequence;
- }
-
- struct ChatUser {
- moqt::FullTrackName full_track_name;
- uint64_t from_group;
- ChatUser(moqt::FullTrackName& ftn, uint64_t group)
- : full_track_name(ftn), from_group(group) {}
- };
-
- // Basic session information
- const std::string chat_id_;
- const std::string username_;
- const moqt::FullTrackName my_track_name_;
-
- // General state variables
- std::unique_ptr<quic::QuicEventLoop> event_loop_;
- bool session_is_open_ = false;
- moqt::MoqtSession* session_ = nullptr;
- moqt::MoqtKnownTrackPublisher publisher_;
- std::unique_ptr<moqt::MoqtClient> client_;
- moqt::MoqtSessionCallbacks session_callbacks_;
-
- // Related to syncing.
- std::optional<uint64_t> catalog_group_;
- moqt::FullTrackName catalog_name_;
- absl::flat_hash_map<std::string, ChatUser> other_users_;
- int subscribes_to_make_ = 1;
-
- // Related to subscriptions/announces
- // TODO: One for each subscribe
- std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
-
- // Handling incoming and outgoing messages
- std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
-
- // Used when chat output goes to file.
- std::ofstream output_file_;
- std::string output_filename_;
-
- // Used when there is no output file, and both input and output are in the
- // terminal.
- std::unique_ptr<quic::InteractiveCli> cli_;
-};
-
// A client for MoQT over chat, used for interop testing. See
// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
int main(int argc, char* argv[]) {
@@ -380,7 +39,10 @@
std::string path = url.PathParamsQuery();
const std::string& username = args[1];
const std::string& chat_id = args[2];
- ChatClient client(server_id, path, username, chat_id);
+ moqt::ChatClient client(
+ server_id, path, username, chat_id,
+ quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
+ quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
while (!client.session_is_open()) {
client.RunEventLoop();
diff --git a/quiche/quic/moqt/tools/moq_chat.h b/quiche/quic/moqt/tools/moq_chat.h
index 37e5278..e2f9041 100644
--- a/quiche/quic/moqt/tools/moq_chat.h
+++ b/quiche/quic/moqt/tools/moq_chat.h
@@ -8,7 +8,6 @@
#include <string>
#include <vector>
-#include "absl/strings/match.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"