Abstract out the MoQ Chat User interface and implement E2E chat test.
Separate the Constructor for ChatClient() and the Connect() call.
ChatClient now takes a generic ChatUserInterface to send output and collect input however the owning application specifies.
PiperOrigin-RevId: 665344318
diff --git a/build/source_list.bzl b/build/source_list.bzl
index df3070d..c1a8bd0 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1556,6 +1556,7 @@
"quic/moqt/tools/chat_client_bin.cc",
"quic/moqt/tools/chat_server.cc",
"quic/moqt/tools/chat_server_bin.cc",
+ "quic/moqt/tools/moq_chat_end_to_end_test.cc",
"quic/moqt/tools/moq_chat_test.cc",
"quic/moqt/tools/moqt_client.cc",
"quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index a51045e..5f3ec02 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1560,6 +1560,7 @@
"src/quiche/quic/moqt/tools/chat_client_bin.cc",
"src/quiche/quic/moqt/tools/chat_server.cc",
"src/quiche/quic/moqt/tools/chat_server_bin.cc",
+ "src/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
"src/quiche/quic/moqt/tools/moq_chat_test.cc",
"src/quiche/quic/moqt/tools/moqt_client.cc",
"src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 079ba76..94ad682 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1559,6 +1559,7 @@
"quiche/quic/moqt/tools/chat_client_bin.cc",
"quiche/quic/moqt/tools/chat_server.cc",
"quiche/quic/moqt/tools/chat_server_bin.cc",
+ "quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
"quiche/quic/moqt/tools/moq_chat_test.cc",
"quiche/quic/moqt/tools/moqt_client.cc",
"quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 2dda0db..8c3fbe3 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -45,8 +45,6 @@
MoqtLiveRelayQueue& operator=(const MoqtLiveRelayQueue&) = delete;
MoqtLiveRelayQueue& operator=(MoqtLiveRelayQueue&&) = default;
- // TODO: Add destructor that terminates all subscriptions.
-
// Publish a received object. Returns false if the object is invalid, given
// other non-normal objects indicate that the sequence number should not
// occur. A false return value might result in a session error on the
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index c40a69c..6d4e765 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -15,7 +15,8 @@
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/platform/api/quiche_test.h"
-namespace moqt {
+namespace moqt::test {
+
namespace {
class TestMoqtLiveRelayQueue : public MoqtLiveRelayQueue,
@@ -346,4 +347,4 @@
} // namespace
-} // namespace moqt
+} // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 5064ccf..3f12724 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -383,6 +383,9 @@
// session does not get called after being destroyed.
std::weak_ptr<void> session_liveness_;
};
+
+ // Private members of MoqtSession.
+
// QueuedOutgoingDataStream records an information necessary to create a
// stream that was attempted to be created before but was blocked due to flow
// control.
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 760104b..067ea56 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -8,7 +8,6 @@
#include <unistd.h>
#include <cstdint>
-#include <fstream>
#include <iostream>
#include <memory>
#include <optional>
@@ -18,8 +17,6 @@
#include <vector>
#include "absl/container/flat_hash_map.h"
-#include "absl/functional/bind_front.h"
-#include "absl/strings/str_cat.h"
#include "absl/strings/str_split.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/crypto/proof_verifier.h"
@@ -33,64 +30,71 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
#include "quiche/quic/platform/api/quic_default_proof_providers.h"
#include "quiche/quic/platform/api/quic_socket_address.h"
#include "quiche/quic/tools/fake_proof_verifier.h"
-#include "quiche/quic/tools/interactive_cli.h"
#include "quiche/quic/tools/quic_name_lookup.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/simple_buffer_allocator.h"
-moqt::ChatClient::ChatClient(const quic::QuicServerId& server_id,
- absl::string_view path, absl::string_view username,
- absl::string_view chat_id, bool ignore_certificate,
- absl::string_view output_file)
- : username_(username), chat_strings_(chat_id) {
- quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
- std::cout << "Connecting to host " << server_id.host() << " port "
- << server_id.port() << " path " << path << "\n";
- event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+namespace moqt {
+
+ChatClient::ChatClient(const quic::QuicServerId& server_id,
+ bool ignore_certificate,
+ std::unique_ptr<ChatUserInterface> interface,
+ quic::QuicEventLoop* event_loop)
+ : event_loop_(event_loop), interface_(std::move(interface)) {
+ if (event_loop_ == nullptr) {
+ quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
+ local_event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+ event_loop_ = local_event_loop_.get();
+ }
+
quic::QuicSocketAddress peer_address =
quic::tools::LookupAddress(AF_UNSPEC, server_id);
std::unique_ptr<quic::ProofVerifier> verifier;
- output_filename_ = output_file;
- if (!output_filename_.empty()) {
- output_file_.open(output_filename_);
- output_file_ << "Chat transcript:\n";
- output_file_.flush();
- }
if (ignore_certificate) {
verifier = std::make_unique<quic::FakeProofVerifier>();
} else {
verifier = quic::CreateDefaultProofVerifier(server_id.host());
}
- client_ = std::make_unique<moqt::MoqtClient>(
- peer_address, server_id, std::move(verifier), event_loop_.get());
+
+ client_ = std::make_unique<MoqtClient>(peer_address, server_id,
+ std::move(verifier), event_loop_);
session_callbacks_.session_established_callback = [this]() {
std::cout << "Session established\n";
session_is_open_ = true;
- if (output_filename_.empty()) { // Use the CLI.
- cli_ = std::make_unique<quic::InteractiveCli>(
- event_loop_.get(),
- absl::bind_front(&ChatClient::OnTerminalLineInput, this));
- cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
- }
};
session_callbacks_.session_terminated_callback =
[this](absl::string_view error_message) {
std::cerr << "Closed session, reason = " << error_message << "\n";
session_is_open_ = false;
+ connect_failed_ = true;
};
session_callbacks_.session_deleted_callback = [this]() {
session_ = nullptr;
};
- client_->Connect(std::string(path), std::move(session_callbacks_));
+ interface_->Initialize(
+ [this](absl::string_view input_message) {
+ OnTerminalLineInput(input_message);
+ },
+ event_loop_);
}
-void moqt::ChatClient::OnTerminalLineInput(absl::string_view input_message) {
+bool ChatClient::Connect(absl::string_view path, absl::string_view username,
+ absl::string_view chat_id) {
+ username_ = username;
+ chat_strings_.emplace(chat_id);
+ client_->Connect(std::string(path), std::move(session_callbacks_));
+ while (!session_is_open_ && !connect_failed_) {
+ RunEventLoop();
+ }
+ return (!connect_failed_);
+}
+
+void ChatClient::OnTerminalLineInput(absl::string_view input_message) {
if (input_message.empty()) {
return;
}
@@ -103,11 +107,11 @@
queue_->AddObject(std::move(message_slice), /*key=*/true);
}
-void moqt::ChatClient::RemoteTrackVisitor::OnReply(
- const moqt::FullTrackName& full_track_name,
+void ChatClient::RemoteTrackVisitor::OnReply(
+ const FullTrackName& full_track_name,
std::optional<absl::string_view> reason_phrase) {
client_->subscribes_to_make_--;
- if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+ if (full_track_name == client_->chat_strings_->GetCatalogName()) {
std::cout << "Subscription to catalog ";
} else {
std::cout << "Subscription to user " << full_track_name.track_namespace
@@ -120,17 +124,17 @@
}
}
-void moqt::ChatClient::RemoteTrackVisitor::OnObjectFragment(
- const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
- uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
- moqt::MoqtObjectStatus /*status*/,
- moqt::MoqtForwardingPreference /*forwarding_preference*/,
+void ChatClient::RemoteTrackVisitor::OnObjectFragment(
+ const FullTrackName& full_track_name, uint64_t group_sequence,
+ uint64_t object_sequence, MoqtPriority /*publisher_priority*/,
+ MoqtObjectStatus /*status*/,
+ MoqtForwardingPreference /*forwarding_preference*/,
absl::string_view object, bool end_of_message) {
if (!end_of_message) {
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
}
- if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+ if (full_track_name == client_->chat_strings_->GetCatalogName()) {
if (group_sequence < client_->catalog_group_) {
std::cout << "Ignoring old catalog";
return;
@@ -144,58 +148,61 @@
std::cout << "Username " << username << "doesn't exist\n";
return;
}
- if (client_->has_output_file()) {
- client_->WriteToFile(username, object);
+ if (object.empty()) {
return;
}
- if (cli_ != nullptr) {
- std::string full_output = absl::StrCat(username, ": ", object);
- cli_->PrintLine(full_output);
- }
+ client_->WriteToOutput(username, object);
}
-bool moqt::ChatClient::AnnounceAndSubscribe() {
+bool ChatClient::AnnounceAndSubscribe() {
session_ = client_->session();
if (session_ == nullptr) {
std::cout << "Failed to connect.\n";
return false;
}
- FullTrackName my_track_name =
- chat_strings_.GetFullTrackNameFromUsername(username_);
- queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
- my_track_name, moqt::MoqtForwardingPreference::kObject);
- publisher_.Add(queue_);
- session_->set_publisher(&publisher_);
- moqt::MoqtOutgoingAnnounceCallback announce_callback =
- [this](absl::string_view track_namespace,
- std::optional<moqt::MoqtAnnounceErrorReason> reason) {
- if (reason.has_value()) {
- std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
- session_->Error(moqt::MoqtError::kInternalError,
- "Local ANNOUNCE rejected");
+ if (!username_.empty()) {
+ // A server log might choose to not provide a username, thus getting all
+ // the messages without adding itself to the catalog.
+ FullTrackName my_track_name =
+ chat_strings_->GetFullTrackNameFromUsername(username_);
+ queue_ = std::make_shared<MoqtOutgoingQueue>(
+ my_track_name, MoqtForwardingPreference::kObject);
+ publisher_.Add(queue_);
+ session_->set_publisher(&publisher_);
+ MoqtOutgoingAnnounceCallback announce_callback =
+ [this](absl::string_view track_namespace,
+ std::optional<MoqtAnnounceErrorReason> reason) {
+ if (reason.has_value()) {
+ std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
+ session_->Error(MoqtError::kInternalError,
+ "Local ANNOUNCE rejected");
+ return;
+ }
+ std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
return;
- }
- std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
- return;
- };
- std::cout << "Announcing " << my_track_name.track_namespace << "\n";
- session_->Announce(my_track_name.track_namespace,
- std::move(announce_callback));
+ };
+ std::cout << "Announcing " << my_track_name.track_namespace << "\n";
+ session_->Announce(my_track_name.track_namespace,
+ std::move(announce_callback));
+ }
remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
- FullTrackName catalog_name = chat_strings_.GetCatalogName();
+ FullTrackName catalog_name = chat_strings_->GetCatalogName();
if (!session_->SubscribeCurrentGroup(
catalog_name.track_namespace, catalog_name.track_name,
remote_track_visitor_.get(), username_)) {
std::cout << "Failed to get catalog\n";
return false;
}
- return true;
+ while (session_is_open_ && is_syncing()) {
+ RunEventLoop();
+ }
+ return session_is_open_;
}
-void moqt::ChatClient::ProcessCatalog(absl::string_view object,
- moqt::RemoteTrack::Visitor* visitor,
- uint64_t group_sequence,
- uint64_t object_sequence) {
+void ChatClient::ProcessCatalog(absl::string_view object,
+ RemoteTrack::Visitor* visitor,
+ uint64_t group_sequence,
+ uint64_t object_sequence) {
std::string message(object);
std::istringstream f(message);
// std::string line;
@@ -209,7 +216,7 @@
for (absl::string_view line : lines) {
if (!got_version) {
if (line != "version=1") {
- session_->Error(moqt::MoqtError::kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"Catalog does not begin with version");
return;
}
@@ -250,8 +257,8 @@
}
auto it = other_users_.find(user);
if (it == other_users_.end()) {
- moqt::FullTrackName to_subscribe =
- chat_strings_.GetFullTrackNameFromUsername(user);
+ FullTrackName to_subscribe =
+ chat_strings_->GetFullTrackNameFromUsername(user);
auto new_user = other_users_.emplace(
std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
ChatUser& user_record = new_user.first->second;
@@ -261,7 +268,7 @@
subscribes_to_make_++;
} else {
if (it->second.from_group == group_sequence) {
- session_->Error(moqt::MoqtError::kProtocolViolation,
+ session_->Error(MoqtError::kProtocolViolation,
"User listed twice in Catalog");
return;
}
@@ -275,3 +282,5 @@
}
catalog_group_ = group_sequence;
}
+
+} // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index b5cb747..e76700e 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -6,7 +6,6 @@
#define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
#include <cstdint>
-#include <fstream>
#include <memory>
#include <optional>
#include <string>
@@ -24,45 +23,68 @@
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
-#include "quiche/quic/tools/interactive_cli.h"
#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_callbacks.h"
namespace moqt {
+constexpr quic::QuicTime::Delta kChatEventLoopDuration =
+ quic::QuicTime::Delta::FromMilliseconds(500);
+
+// Chat clients accept a ChatUserInterface that implements how user input is
+// captured, and peer messages are displayed.
+class ChatUserInterface {
+ public:
+ virtual ~ChatUserInterface() = default;
+
+ // ChatUserInterface cannot be used until initialized. This is separate from
+ // the constructor, because the constructor might create the event loop.
+ // |callback| is what ChatUserInterface will call when there is user input.
+ // |event_loop| is the event loop that the ChatUserInterface should use.
+ virtual void Initialize(
+ quiche::MultiUseCallback<void(absl::string_view)> callback,
+ quic::QuicEventLoop* event_loop) = 0;
+ // Write a peer message to the user output.
+ virtual void WriteToOutput(absl::string_view user,
+ absl::string_view message) = 0;
+ // Run the event loop for a short interval and exit.
+ virtual void IoLoop() = 0;
+};
+
class ChatClient {
public:
- ChatClient(const quic::QuicServerId& server_id, absl::string_view path,
- absl::string_view username, absl::string_view chat_id,
- bool ignore_certificate, absl::string_view output_file);
+ // If |event_loop| is nullptr, a new one will be created. If multiple
+ // endpoints are running on the same thread, as in tests, they should share
+ // an event loop.
+ ChatClient(const quic::QuicServerId& server_id, bool ignore_certificate,
+ std::unique_ptr<ChatUserInterface> interface,
+ quic::QuicEventLoop* event_loop = nullptr);
+
+ // Establish the MoQT session. Returns false if it fails.
+ bool Connect(absl::string_view path, absl::string_view username,
+ absl::string_view chat_id);
void OnTerminalLineInput(absl::string_view input_message);
- bool session_is_open() const { return session_is_open_; }
-
- // Returns true if the client is still doing initial sync: retrieving the
- // catalog, subscribing to all the users in it, and waiting for the server
- // to subscribe to the local track.
- bool is_syncing() const {
- return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
- (queue_ == nullptr || !queue_->HasSubscribers());
+ // Run the event loop until an input or output event is ready, or the
+ // session closes.
+ void IoLoop() {
+ while (session_is_open_) {
+ interface_->IoLoop();
+ }
}
- void RunEventLoop() {
- event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500));
+ void WriteToOutput(absl::string_view user, absl::string_view message) {
+ if (interface_ != nullptr) {
+ interface_->WriteToOutput(user, message);
+ }
}
- bool has_output_file() { return !output_filename_.empty(); }
-
- void WriteToFile(absl::string_view user, absl::string_view message) {
- output_file_ << user << ": " << message << "\n\n";
- output_file_.flush();
- }
+ quic::QuicEventLoop* event_loop() { return event_loop_; }
class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
public:
- RemoteTrackVisitor(ChatClient* client) : client_(client) {
- cli_ = client->cli_.get();
- }
+ RemoteTrackVisitor(ChatClient* client) : client_(client) {}
void OnReply(const moqt::FullTrackName& full_track_name,
std::optional<absl::string_view> reason_phrase) override;
@@ -75,17 +97,26 @@
absl::string_view object,
bool end_of_message) override;
- void set_cli(quic::InteractiveCli* cli) { cli_ = cli; }
-
private:
ChatClient* client_;
- quic::InteractiveCli* cli_;
};
// Returns false on error.
bool AnnounceAndSubscribe();
+ bool session_is_open() const { return session_is_open_; }
+
+ // Returns true if the client is still doing initial sync: retrieving the
+ // catalog, subscribing to all the users in it, and waiting for the server
+ // to subscribe to the local track.
+ bool is_syncing() const {
+ return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+ (queue_ == nullptr || !queue_->HasSubscribers());
+ }
+
private:
+ void RunEventLoop() { event_loop_->RunEventLoopOnce(kChatEventLoopDuration); }
+
// Objects from the same catalog group arrive on the same stream, and in
// object sequence order.
void ProcessCatalog(absl::string_view object,
@@ -100,11 +131,15 @@
};
// Basic session information
- const std::string username_;
- moqt::MoqChatStrings chat_strings_;
+ std::string username_;
+ std::optional<moqt::MoqChatStrings> chat_strings_;
// General state variables
- std::unique_ptr<quic::QuicEventLoop> event_loop_;
+ // The event loop to use for this client.
+ quic::QuicEventLoop* event_loop_;
+ // If the client created its own event loop, it will own it.
+ std::unique_ptr<quic::QuicEventLoop> local_event_loop_;
+ bool connect_failed_ = false;
bool session_is_open_ = false;
moqt::MoqtSession* session_ = nullptr;
moqt::MoqtKnownTrackPublisher publisher_;
@@ -123,13 +158,8 @@
// Handling outgoing messages
std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
- // Used when chat output goes to file.
- std::ofstream output_file_;
- std::string output_filename_;
-
- // Used when there is no output file, and both input and output are in the
- // terminal.
- std::unique_ptr<quic::InteractiveCli> cli_;
+ // User interface for input and output.
+ std::unique_ptr<ChatUserInterface> interface_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 867db4e..7617789 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -5,13 +5,20 @@
#include <poll.h>
#include <unistd.h>
+#include <fstream>
#include <iostream>
+#include <memory>
#include <string>
+#include <utility>
#include <vector>
+#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
#include "quiche/quic/core/quic_server_id.h"
#include "quiche/quic/moqt/tools/chat_client.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/tools/interactive_cli.h"
#include "quiche/quic/tools/quic_name_lookup.h"
#include "quiche/quic/tools/quic_url.h"
#include "quiche/common/platform/api/quiche_command_line_flags.h"
@@ -24,6 +31,90 @@
std::string, output_file, "",
"chat messages will stream to a file instead of stdout");
+// Writes messages to a file, when directed from the command line.
+class FileOutput : public moqt::ChatUserInterface {
+ public:
+ explicit FileOutput(absl::string_view filename, absl::string_view username)
+ : username_(username) {
+ output_file_.open(filename);
+ output_file_ << "Chat transcript:\n";
+ output_file_.flush();
+ std::cout << "Fully connected. Messages are in the output file. Exit the "
+ << "session by entering /exit\n";
+ }
+
+ ~FileOutput() override { output_file_.close(); }
+
+ void Initialize(quic::InteractiveCli::LineCallback callback,
+ quic::QuicEventLoop* event_loop) override {
+ callback_ = std::move(callback);
+ event_loop_ = event_loop;
+ }
+
+ void WriteToOutput(absl::string_view user,
+ absl::string_view message) override {
+ if (message.empty()) {
+ return;
+ }
+ output_file_ << user << ": " << message << "\n\n";
+ output_file_.flush();
+ }
+
+ void IoLoop() override {
+ std::string message_to_send;
+ QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized,
+ event_loop_ == nullptr)
+ << "IoLoop called before Initialize";
+ while (poll(&poll_settings_, 1, 0) <= 0) {
+ event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ }
+ std::getline(std::cin, message_to_send);
+ callback_(message_to_send);
+ WriteToOutput(username_, message_to_send);
+ }
+
+ private:
+ quic::QuicEventLoop* event_loop_;
+ quic::InteractiveCli::LineCallback callback_;
+ std::ofstream output_file_;
+ absl::string_view username_;
+ struct pollfd poll_settings_ = {
+ 0,
+ POLLIN,
+ POLLIN,
+ };
+};
+
+// Writes messages to the terminal, without messing up entry of new messages.
+class CliOutput : public moqt::ChatUserInterface {
+ public:
+ void Initialize(quic::InteractiveCli::LineCallback callback,
+ quic::QuicEventLoop* event_loop) override {
+ cli_ =
+ std::make_unique<quic::InteractiveCli>(event_loop, std::move(callback));
+ event_loop_ = event_loop;
+ cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
+ }
+
+ void WriteToOutput(absl::string_view user,
+ absl::string_view message) override {
+ QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized, cli_ == nullptr)
+ << "WriteToOutput called before Initialize";
+ cli_->PrintLine(absl::StrCat(user, ": ", message));
+ }
+
+ void IoLoop() override {
+ QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized,
+ event_loop_ == nullptr)
+ << "IoLoop called before Initialize";
+ event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ }
+
+ private:
+ quic::QuicEventLoop* event_loop_;
+ std::unique_ptr<quic::InteractiveCli> cli_;
+};
+
// A client for MoQT over chat, used for interop testing. See
// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
int main(int argc, char* argv[]) {
@@ -39,46 +130,26 @@
std::string path = url.PathParamsQuery();
const std::string& username = args[1];
const std::string& chat_id = args[2];
- moqt::ChatClient client(
- server_id, path, username, chat_id,
- quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
- quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
+ std::string output_filename =
+ quiche::GetQuicheCommandLineFlag(FLAGS_output_file);
+ std::unique_ptr<moqt::ChatUserInterface> interface;
- while (!client.session_is_open()) {
- client.RunEventLoop();
+ if (!output_filename.empty()) {
+ interface = std::make_unique<FileOutput>(output_filename, username);
+ } else { // Use the CLI.
+ interface = std::make_unique<CliOutput>();
}
+ moqt::ChatClient client(
+ server_id,
+ quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
+ std::move(interface));
+ if (!client.Connect(path, username, chat_id)) {
+ return 1;
+ }
if (!client.AnnounceAndSubscribe()) {
return 1;
}
- while (client.is_syncing()) {
- client.RunEventLoop();
- }
- if (!client.session_is_open()) {
- return 1; // Something went wrong in connecting.
- }
- if (!client.has_output_file()) {
- while (client.session_is_open()) {
- client.RunEventLoop();
- }
- return 0;
- }
- // There is an output file.
- std::cout << "Fully connected. Messages are in the output file. Exit the "
- << "session by entering /exit\n";
- struct pollfd poll_settings = {
- 0,
- POLLIN,
- POLLIN,
- };
- while (client.session_is_open()) {
- std::string message_to_send;
- while (poll(&poll_settings, 1, 0) <= 0) {
- client.RunEventLoop();
- }
- std::getline(std::cin, message_to_send);
- client.OnTerminalLineInput(message_to_send);
- client.WriteToFile(username, message_to_send);
- }
+ client.IoLoop();
return 0;
}
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index f984a71..c764894 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -137,6 +137,12 @@
}
}
+ChatServer::~ChatServer() {
+ // Kill all sessions so that the callback doesn't fire when the server is
+ // destroyed.
+ server_.quic_server().Shutdown();
+}
+
void ChatServer::AddUser(absl::string_view username) {
std::string catalog_data = absl::StrCat("+", username);
catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 813194e..fab6e2e 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -34,6 +34,7 @@
public:
ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
absl::string_view chat_id, absl::string_view output_file);
+ ~ChatServer();
class RemoteTrackVisitor : public RemoteTrack::Visitor {
public:
@@ -104,6 +105,7 @@
[&](absl::string_view path) { return IncomingSessionHandler(path); };
MoqtServer server_;
+ std::list<ChatServerSessionHandler> sessions_;
MoqChatStrings strings_;
MoqtKnownTrackPublisher publisher_;
// Allocator for QuicheBuffer that contains catalog objects.
@@ -111,7 +113,6 @@
std::shared_ptr<MoqtOutgoingQueue> catalog_;
RemoteTrackVisitor remote_track_visitor_;
// indexed by username
- std::list<ChatServerSessionHandler> sessions_;
absl::flat_hash_map<std::string, std::shared_ptr<MoqtLiveRelayQueue>>
user_queues_;
std::string output_filename_;
diff --git a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
new file mode 100644
index 0000000..bfa5248
--- /dev/null
+++ b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/moqt/tools/chat_client.h"
+#include "quiche/quic/moqt/tools/chat_server.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/test_tools/crypto_test_utils.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_ip_address.h"
+
+namespace moqt {
+
+namespace test {
+
+using ::testing::_;
+
+constexpr std::string kChatHostname = "127.0.0.1";
+
+class MockChatUserInterface : public ChatUserInterface {
+ public:
+ void Initialize(quiche::MultiUseCallback<void(absl::string_view)> callback,
+ quic::QuicEventLoop* event_loop) override {
+ callback_ = std::move(callback);
+ event_loop_ = event_loop;
+ }
+
+ void IoLoop() override {
+ event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+ }
+
+ MOCK_METHOD(void, WriteToOutput,
+ (absl::string_view user, absl::string_view message), (override));
+
+ void SendMessage(absl::string_view message) { callback_(message); }
+
+ private:
+ quiche::MultiUseCallback<void(absl::string_view)> callback_;
+ quic::QuicEventLoop* event_loop_;
+ std::string message_;
+};
+
+class MoqChatEndToEndTest : public quiche::test::QuicheTest {
+ public:
+ MoqChatEndToEndTest()
+ : server_(quic::test::crypto_test_utils::ProofSourceForTesting(),
+ "test_chat", "") {
+ quiche::QuicheIpAddress bind_address;
+ bind_address.FromString(kChatHostname);
+ EXPECT_TRUE(server_.moqt_server().quic_server().CreateUDPSocketAndListen(
+ quic::QuicSocketAddress(bind_address, 0)));
+ auto if1ptr = std::make_unique<MockChatUserInterface>();
+ auto if2ptr = std::make_unique<MockChatUserInterface>();
+ interface1_ = if1ptr.get();
+ interface2_ = if2ptr.get();
+ uint16_t port = server_.moqt_server().quic_server().port();
+ client1_ = std::make_unique<ChatClient>(
+ quic::QuicServerId(kChatHostname, port), true, std::move(if1ptr),
+ server_.moqt_server().quic_server().event_loop());
+ client2_ = std::make_unique<ChatClient>(
+ quic::QuicServerId(kChatHostname, port), true, std::move(if2ptr),
+ server_.moqt_server().quic_server().event_loop());
+ }
+
+ ChatServer server_;
+ MockChatUserInterface *interface1_, *interface2_;
+ std::unique_ptr<ChatClient> client1_, client2_;
+};
+
+TEST_F(MoqChatEndToEndTest, EndToEndTest) {
+ EXPECT_TRUE(client1_->Connect("/moq-chat", "client1", "test_chat"));
+ EXPECT_TRUE(client2_->Connect("/moq-chat", "client2", "test_chat"));
+ EXPECT_TRUE(client1_->AnnounceAndSubscribe());
+ EXPECT_TRUE(client2_->AnnounceAndSubscribe());
+ EXPECT_CALL(*interface2_, WriteToOutput("client1", "Hello")).Times(1);
+ interface1_->SendMessage("Hello");
+ server_.moqt_server().quic_server().WaitForEvents();
+ EXPECT_CALL(*interface1_, WriteToOutput("client2", "Hi")).Times(1);
+ interface2_->SendMessage("Hi");
+ server_.moqt_server().quic_server().WaitForEvents();
+ EXPECT_CALL(*interface2_, WriteToOutput("client1", "How are you?")).Times(1);
+ interface1_->SendMessage("How are you?");
+ server_.moqt_server().quic_server().WaitForEvents();
+ EXPECT_CALL(*interface1_, WriteToOutput("client2", "Good, and you?"))
+ .Times(1);
+ interface2_->SendMessage("Good, and you?");
+ server_.moqt_server().quic_server().WaitForEvents();
+ EXPECT_CALL(*interface2_, WriteToOutput("client1", "I'm fine")).Times(1);
+ interface1_->SendMessage("I'm fine");
+ server_.moqt_server().quic_server().WaitForEvents();
+ EXPECT_CALL(*interface1_, WriteToOutput("client2", "Goodbye")).Times(1);
+ interface2_->SendMessage("Goodbye");
+ server_.moqt_server().quic_server().WaitForEvents();
+ interface1_->SendMessage("/exit");
+ EXPECT_CALL(*interface2_, WriteToOutput(_, _)).Times(0);
+ server_.moqt_server().quic_server().WaitForEvents();
+}
+
+// TODO(martinduke): Add tests for users leaving the chat
+
+} // namespace test
+
+} // namespace moqt