Abstract out the MoQ Chat User interface and implement E2E chat test.

Separate the Constructor for ChatClient() and the Connect() call.

ChatClient now takes a generic ChatUserInterface to send output and collect input however the owning application specifies.

PiperOrigin-RevId: 665344318
diff --git a/build/source_list.bzl b/build/source_list.bzl
index df3070d..c1a8bd0 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1556,6 +1556,7 @@
     "quic/moqt/tools/chat_client_bin.cc",
     "quic/moqt/tools/chat_server.cc",
     "quic/moqt/tools/chat_server_bin.cc",
+    "quic/moqt/tools/moq_chat_end_to_end_test.cc",
     "quic/moqt/tools/moq_chat_test.cc",
     "quic/moqt/tools/moqt_client.cc",
     "quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index a51045e..5f3ec02 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1560,6 +1560,7 @@
     "src/quiche/quic/moqt/tools/chat_client_bin.cc",
     "src/quiche/quic/moqt/tools/chat_server.cc",
     "src/quiche/quic/moqt/tools/chat_server_bin.cc",
+    "src/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
     "src/quiche/quic/moqt/tools/moq_chat_test.cc",
     "src/quiche/quic/moqt/tools/moqt_client.cc",
     "src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 079ba76..94ad682 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1559,6 +1559,7 @@
     "quiche/quic/moqt/tools/chat_client_bin.cc",
     "quiche/quic/moqt/tools/chat_server.cc",
     "quiche/quic/moqt/tools/chat_server_bin.cc",
+    "quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc",
     "quiche/quic/moqt/tools/moq_chat_test.cc",
     "quiche/quic/moqt/tools/moqt_client.cc",
     "quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 2dda0db..8c3fbe3 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -45,8 +45,6 @@
   MoqtLiveRelayQueue& operator=(const MoqtLiveRelayQueue&) = delete;
   MoqtLiveRelayQueue& operator=(MoqtLiveRelayQueue&&) = default;
 
-  // TODO: Add destructor that terminates all subscriptions.
-
   // Publish a received object. Returns false if the object is invalid, given
   // other non-normal objects indicate that the sequence number should not
   // occur. A false return value might result in a session error on the
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index c40a69c..6d4e765 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -15,7 +15,8 @@
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
 
-namespace moqt {
+namespace moqt::test {
+
 namespace {
 
 class TestMoqtLiveRelayQueue : public MoqtLiveRelayQueue,
@@ -346,4 +347,4 @@
 
 }  // namespace
 
-}  // namespace moqt
+}  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 5064ccf..3f12724 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -383,6 +383,9 @@
     // session does not get called after being destroyed.
     std::weak_ptr<void> session_liveness_;
   };
+
+  // Private members of MoqtSession.
+
   // QueuedOutgoingDataStream records an information necessary to create a
   // stream that was attempted to be created before but was blocked due to flow
   // control.
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 760104b..067ea56 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -8,7 +8,6 @@
 #include <unistd.h>
 
 #include <cstdint>
-#include <fstream>
 #include <iostream>
 #include <memory>
 #include <optional>
@@ -18,8 +17,6 @@
 #include <vector>
 
 #include "absl/container/flat_hash_map.h"
-#include "absl/functional/bind_front.h"
-#include "absl/strings/str_cat.h"
 #include "absl/strings/str_split.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/crypto/proof_verifier.h"
@@ -33,64 +30,71 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/moqt/tools/moq_chat.h"
 #include "quiche/quic/moqt/tools/moqt_client.h"
 #include "quiche/quic/platform/api/quic_default_proof_providers.h"
 #include "quiche/quic/platform/api/quic_socket_address.h"
 #include "quiche/quic/tools/fake_proof_verifier.h"
-#include "quiche/quic/tools/interactive_cli.h"
 #include "quiche/quic/tools/quic_name_lookup.h"
 #include "quiche/common/platform/api/quiche_mem_slice.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/simple_buffer_allocator.h"
 
-moqt::ChatClient::ChatClient(const quic::QuicServerId& server_id,
-                             absl::string_view path, absl::string_view username,
-                             absl::string_view chat_id, bool ignore_certificate,
-                             absl::string_view output_file)
-    : username_(username), chat_strings_(chat_id) {
-  quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
-  std::cout << "Connecting to host " << server_id.host() << " port "
-            << server_id.port() << " path " << path << "\n";
-  event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+namespace moqt {
+
+ChatClient::ChatClient(const quic::QuicServerId& server_id,
+                       bool ignore_certificate,
+                       std::unique_ptr<ChatUserInterface> interface,
+                       quic::QuicEventLoop* event_loop)
+    : event_loop_(event_loop), interface_(std::move(interface)) {
+  if (event_loop_ == nullptr) {
+    quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
+    local_event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+    event_loop_ = local_event_loop_.get();
+  }
+
   quic::QuicSocketAddress peer_address =
       quic::tools::LookupAddress(AF_UNSPEC, server_id);
   std::unique_ptr<quic::ProofVerifier> verifier;
-  output_filename_ = output_file;
-  if (!output_filename_.empty()) {
-    output_file_.open(output_filename_);
-    output_file_ << "Chat transcript:\n";
-    output_file_.flush();
-  }
   if (ignore_certificate) {
     verifier = std::make_unique<quic::FakeProofVerifier>();
   } else {
     verifier = quic::CreateDefaultProofVerifier(server_id.host());
   }
-  client_ = std::make_unique<moqt::MoqtClient>(
-      peer_address, server_id, std::move(verifier), event_loop_.get());
+
+  client_ = std::make_unique<MoqtClient>(peer_address, server_id,
+                                         std::move(verifier), event_loop_);
   session_callbacks_.session_established_callback = [this]() {
     std::cout << "Session established\n";
     session_is_open_ = true;
-    if (output_filename_.empty()) {  // Use the CLI.
-      cli_ = std::make_unique<quic::InteractiveCli>(
-          event_loop_.get(),
-          absl::bind_front(&ChatClient::OnTerminalLineInput, this));
-      cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
-    }
   };
   session_callbacks_.session_terminated_callback =
       [this](absl::string_view error_message) {
         std::cerr << "Closed session, reason = " << error_message << "\n";
         session_is_open_ = false;
+        connect_failed_ = true;
       };
   session_callbacks_.session_deleted_callback = [this]() {
     session_ = nullptr;
   };
-  client_->Connect(std::string(path), std::move(session_callbacks_));
+  interface_->Initialize(
+      [this](absl::string_view input_message) {
+        OnTerminalLineInput(input_message);
+      },
+      event_loop_);
 }
 
-void moqt::ChatClient::OnTerminalLineInput(absl::string_view input_message) {
+bool ChatClient::Connect(absl::string_view path, absl::string_view username,
+                         absl::string_view chat_id) {
+  username_ = username;
+  chat_strings_.emplace(chat_id);
+  client_->Connect(std::string(path), std::move(session_callbacks_));
+  while (!session_is_open_ && !connect_failed_) {
+    RunEventLoop();
+  }
+  return (!connect_failed_);
+}
+
+void ChatClient::OnTerminalLineInput(absl::string_view input_message) {
   if (input_message.empty()) {
     return;
   }
@@ -103,11 +107,11 @@
   queue_->AddObject(std::move(message_slice), /*key=*/true);
 }
 
-void moqt::ChatClient::RemoteTrackVisitor::OnReply(
-    const moqt::FullTrackName& full_track_name,
+void ChatClient::RemoteTrackVisitor::OnReply(
+    const FullTrackName& full_track_name,
     std::optional<absl::string_view> reason_phrase) {
   client_->subscribes_to_make_--;
-  if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+  if (full_track_name == client_->chat_strings_->GetCatalogName()) {
     std::cout << "Subscription to catalog ";
   } else {
     std::cout << "Subscription to user " << full_track_name.track_namespace
@@ -120,17 +124,17 @@
   }
 }
 
-void moqt::ChatClient::RemoteTrackVisitor::OnObjectFragment(
-    const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
-    uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/,
-    moqt::MoqtObjectStatus /*status*/,
-    moqt::MoqtForwardingPreference /*forwarding_preference*/,
+void ChatClient::RemoteTrackVisitor::OnObjectFragment(
+    const FullTrackName& full_track_name, uint64_t group_sequence,
+    uint64_t object_sequence, MoqtPriority /*publisher_priority*/,
+    MoqtObjectStatus /*status*/,
+    MoqtForwardingPreference /*forwarding_preference*/,
     absl::string_view object, bool end_of_message) {
   if (!end_of_message) {
     std::cerr << "Error: received partial message despite requesting "
                  "buffering\n";
   }
-  if (full_track_name == client_->chat_strings_.GetCatalogName()) {
+  if (full_track_name == client_->chat_strings_->GetCatalogName()) {
     if (group_sequence < client_->catalog_group_) {
       std::cout << "Ignoring old catalog";
       return;
@@ -144,58 +148,61 @@
     std::cout << "Username " << username << "doesn't exist\n";
     return;
   }
-  if (client_->has_output_file()) {
-    client_->WriteToFile(username, object);
+  if (object.empty()) {
     return;
   }
-  if (cli_ != nullptr) {
-    std::string full_output = absl::StrCat(username, ": ", object);
-    cli_->PrintLine(full_output);
-  }
+  client_->WriteToOutput(username, object);
 }
 
-bool moqt::ChatClient::AnnounceAndSubscribe() {
+bool ChatClient::AnnounceAndSubscribe() {
   session_ = client_->session();
   if (session_ == nullptr) {
     std::cout << "Failed to connect.\n";
     return false;
   }
-  FullTrackName my_track_name =
-      chat_strings_.GetFullTrackNameFromUsername(username_);
-  queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
-      my_track_name, moqt::MoqtForwardingPreference::kObject);
-  publisher_.Add(queue_);
-  session_->set_publisher(&publisher_);
-  moqt::MoqtOutgoingAnnounceCallback announce_callback =
-      [this](absl::string_view track_namespace,
-             std::optional<moqt::MoqtAnnounceErrorReason> reason) {
-        if (reason.has_value()) {
-          std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
-          session_->Error(moqt::MoqtError::kInternalError,
-                          "Local ANNOUNCE rejected");
+  if (!username_.empty()) {
+    // A server log might choose to not provide a username, thus getting all
+    // the messages without adding itself to the catalog.
+    FullTrackName my_track_name =
+        chat_strings_->GetFullTrackNameFromUsername(username_);
+    queue_ = std::make_shared<MoqtOutgoingQueue>(
+        my_track_name, MoqtForwardingPreference::kObject);
+    publisher_.Add(queue_);
+    session_->set_publisher(&publisher_);
+    MoqtOutgoingAnnounceCallback announce_callback =
+        [this](absl::string_view track_namespace,
+               std::optional<MoqtAnnounceErrorReason> reason) {
+          if (reason.has_value()) {
+            std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n";
+            session_->Error(MoqtError::kInternalError,
+                            "Local ANNOUNCE rejected");
+            return;
+          }
+          std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
           return;
-        }
-        std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
-        return;
-      };
-  std::cout << "Announcing " << my_track_name.track_namespace << "\n";
-  session_->Announce(my_track_name.track_namespace,
-                     std::move(announce_callback));
+        };
+    std::cout << "Announcing " << my_track_name.track_namespace << "\n";
+    session_->Announce(my_track_name.track_namespace,
+                       std::move(announce_callback));
+  }
   remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
-  FullTrackName catalog_name = chat_strings_.GetCatalogName();
+  FullTrackName catalog_name = chat_strings_->GetCatalogName();
   if (!session_->SubscribeCurrentGroup(
           catalog_name.track_namespace, catalog_name.track_name,
           remote_track_visitor_.get(), username_)) {
     std::cout << "Failed to get catalog\n";
     return false;
   }
-  return true;
+  while (session_is_open_ && is_syncing()) {
+    RunEventLoop();
+  }
+  return session_is_open_;
 }
 
-void moqt::ChatClient::ProcessCatalog(absl::string_view object,
-                                      moqt::RemoteTrack::Visitor* visitor,
-                                      uint64_t group_sequence,
-                                      uint64_t object_sequence) {
+void ChatClient::ProcessCatalog(absl::string_view object,
+                                RemoteTrack::Visitor* visitor,
+                                uint64_t group_sequence,
+                                uint64_t object_sequence) {
   std::string message(object);
   std::istringstream f(message);
   // std::string line;
@@ -209,7 +216,7 @@
   for (absl::string_view line : lines) {
     if (!got_version) {
       if (line != "version=1") {
-        session_->Error(moqt::MoqtError::kProtocolViolation,
+        session_->Error(MoqtError::kProtocolViolation,
                         "Catalog does not begin with version");
         return;
       }
@@ -250,8 +257,8 @@
     }
     auto it = other_users_.find(user);
     if (it == other_users_.end()) {
-      moqt::FullTrackName to_subscribe =
-          chat_strings_.GetFullTrackNameFromUsername(user);
+      FullTrackName to_subscribe =
+          chat_strings_->GetFullTrackNameFromUsername(user);
       auto new_user = other_users_.emplace(
           std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
       ChatUser& user_record = new_user.first->second;
@@ -261,7 +268,7 @@
       subscribes_to_make_++;
     } else {
       if (it->second.from_group == group_sequence) {
-        session_->Error(moqt::MoqtError::kProtocolViolation,
+        session_->Error(MoqtError::kProtocolViolation,
                         "User listed twice in Catalog");
         return;
       }
@@ -275,3 +282,5 @@
   }
   catalog_group_ = group_sequence;
 }
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index b5cb747..e76700e 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -6,7 +6,6 @@
 #define QUICHE_QUIC_MOQT_TOOLS_CHAT_CLIENT_H
 
 #include <cstdint>
-#include <fstream>
 #include <memory>
 #include <optional>
 #include <string>
@@ -24,45 +23,68 @@
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moq_chat.h"
 #include "quiche/quic/moqt/tools/moqt_client.h"
-#include "quiche/quic/tools/interactive_cli.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_callbacks.h"
 
 namespace moqt {
 
+constexpr quic::QuicTime::Delta kChatEventLoopDuration =
+    quic::QuicTime::Delta::FromMilliseconds(500);
+
+// Chat clients accept a ChatUserInterface that implements how user input is
+// captured, and peer messages are displayed.
+class ChatUserInterface {
+ public:
+  virtual ~ChatUserInterface() = default;
+
+  // ChatUserInterface cannot be used until initialized. This is separate from
+  // the constructor, because the constructor might create the event loop.
+  // |callback| is what ChatUserInterface will call when there is user input.
+  // |event_loop| is the event loop that the ChatUserInterface should use.
+  virtual void Initialize(
+      quiche::MultiUseCallback<void(absl::string_view)> callback,
+      quic::QuicEventLoop* event_loop) = 0;
+  // Write a peer message to the user output.
+  virtual void WriteToOutput(absl::string_view user,
+                             absl::string_view message) = 0;
+  // Run the event loop for a short interval and exit.
+  virtual void IoLoop() = 0;
+};
+
 class ChatClient {
  public:
-  ChatClient(const quic::QuicServerId& server_id, absl::string_view path,
-             absl::string_view username, absl::string_view chat_id,
-             bool ignore_certificate, absl::string_view output_file);
+  // If |event_loop| is nullptr, a new one will be created. If multiple
+  // endpoints are running on the same thread, as in tests, they should share
+  // an event loop.
+  ChatClient(const quic::QuicServerId& server_id, bool ignore_certificate,
+             std::unique_ptr<ChatUserInterface> interface,
+             quic::QuicEventLoop* event_loop = nullptr);
+
+  // Establish the MoQT session. Returns false if it fails.
+  bool Connect(absl::string_view path, absl::string_view username,
+               absl::string_view chat_id);
 
   void OnTerminalLineInput(absl::string_view input_message);
 
-  bool session_is_open() const { return session_is_open_; }
-
-  // Returns true if the client is still doing initial sync: retrieving the
-  // catalog, subscribing to all the users in it, and waiting for the server
-  // to subscribe to the local track.
-  bool is_syncing() const {
-    return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
-           (queue_ == nullptr || !queue_->HasSubscribers());
+  // Run the event loop until an input or output event is ready, or the
+  // session closes.
+  void IoLoop() {
+    while (session_is_open_) {
+      interface_->IoLoop();
+    }
   }
 
-  void RunEventLoop() {
-    event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromMilliseconds(500));
+  void WriteToOutput(absl::string_view user, absl::string_view message) {
+    if (interface_ != nullptr) {
+      interface_->WriteToOutput(user, message);
+    }
   }
 
-  bool has_output_file() { return !output_filename_.empty(); }
-
-  void WriteToFile(absl::string_view user, absl::string_view message) {
-    output_file_ << user << ": " << message << "\n\n";
-    output_file_.flush();
-  }
+  quic::QuicEventLoop* event_loop() { return event_loop_; }
 
   class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
    public:
-    RemoteTrackVisitor(ChatClient* client) : client_(client) {
-      cli_ = client->cli_.get();
-    }
+    RemoteTrackVisitor(ChatClient* client) : client_(client) {}
 
     void OnReply(const moqt::FullTrackName& full_track_name,
                  std::optional<absl::string_view> reason_phrase) override;
@@ -75,17 +97,26 @@
                           absl::string_view object,
                           bool end_of_message) override;
 
-    void set_cli(quic::InteractiveCli* cli) { cli_ = cli; }
-
    private:
     ChatClient* client_;
-    quic::InteractiveCli* cli_;
   };
 
   // Returns false on error.
   bool AnnounceAndSubscribe();
 
+  bool session_is_open() const { return session_is_open_; }
+
+  // Returns true if the client is still doing initial sync: retrieving the
+  // catalog, subscribing to all the users in it, and waiting for the server
+  // to subscribe to the local track.
+  bool is_syncing() const {
+    return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+           (queue_ == nullptr || !queue_->HasSubscribers());
+  }
+
  private:
+  void RunEventLoop() { event_loop_->RunEventLoopOnce(kChatEventLoopDuration); }
+
   // Objects from the same catalog group arrive on the same stream, and in
   // object sequence order.
   void ProcessCatalog(absl::string_view object,
@@ -100,11 +131,15 @@
   };
 
   // Basic session information
-  const std::string username_;
-  moqt::MoqChatStrings chat_strings_;
+  std::string username_;
+  std::optional<moqt::MoqChatStrings> chat_strings_;
 
   // General state variables
-  std::unique_ptr<quic::QuicEventLoop> event_loop_;
+  // The event loop to use for this client.
+  quic::QuicEventLoop* event_loop_;
+  // If the client created its own event loop, it will own it.
+  std::unique_ptr<quic::QuicEventLoop> local_event_loop_;
+  bool connect_failed_ = false;
   bool session_is_open_ = false;
   moqt::MoqtSession* session_ = nullptr;
   moqt::MoqtKnownTrackPublisher publisher_;
@@ -123,13 +158,8 @@
   // Handling outgoing messages
   std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
 
-  // Used when chat output goes to file.
-  std::ofstream output_file_;
-  std::string output_filename_;
-
-  // Used when there is no output file, and both input and output are in the
-  // terminal.
-  std::unique_ptr<quic::InteractiveCli> cli_;
+  // User interface for input and output.
+  std::unique_ptr<ChatUserInterface> interface_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index 867db4e..7617789 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -5,13 +5,20 @@
 #include <poll.h>
 #include <unistd.h>
 
+#include <fstream>
 #include <iostream>
+#include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
+#include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
 #include "quiche/quic/core/quic_server_id.h"
 #include "quiche/quic/moqt/tools/chat_client.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/tools/interactive_cli.h"
 #include "quiche/quic/tools/quic_name_lookup.h"
 #include "quiche/quic/tools/quic_url.h"
 #include "quiche/common/platform/api/quiche_command_line_flags.h"
@@ -24,6 +31,90 @@
     std::string, output_file, "",
     "chat messages will stream to a file instead of stdout");
 
+// Writes messages to a file, when directed from the command line.
+class FileOutput : public moqt::ChatUserInterface {
+ public:
+  explicit FileOutput(absl::string_view filename, absl::string_view username)
+      : username_(username) {
+    output_file_.open(filename);
+    output_file_ << "Chat transcript:\n";
+    output_file_.flush();
+    std::cout << "Fully connected. Messages are in the output file. Exit the "
+              << "session by entering /exit\n";
+  }
+
+  ~FileOutput() override { output_file_.close(); }
+
+  void Initialize(quic::InteractiveCli::LineCallback callback,
+                  quic::QuicEventLoop* event_loop) override {
+    callback_ = std::move(callback);
+    event_loop_ = event_loop;
+  }
+
+  void WriteToOutput(absl::string_view user,
+                     absl::string_view message) override {
+    if (message.empty()) {
+      return;
+    }
+    output_file_ << user << ": " << message << "\n\n";
+    output_file_.flush();
+  }
+
+  void IoLoop() override {
+    std::string message_to_send;
+    QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized,
+                event_loop_ == nullptr)
+        << "IoLoop called before Initialize";
+    while (poll(&poll_settings_, 1, 0) <= 0) {
+      event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+    }
+    std::getline(std::cin, message_to_send);
+    callback_(message_to_send);
+    WriteToOutput(username_, message_to_send);
+  }
+
+ private:
+  quic::QuicEventLoop* event_loop_;
+  quic::InteractiveCli::LineCallback callback_;
+  std::ofstream output_file_;
+  absl::string_view username_;
+  struct pollfd poll_settings_ = {
+      0,
+      POLLIN,
+      POLLIN,
+  };
+};
+
+// Writes messages to the terminal, without messing up entry of new messages.
+class CliOutput : public moqt::ChatUserInterface {
+ public:
+  void Initialize(quic::InteractiveCli::LineCallback callback,
+                  quic::QuicEventLoop* event_loop) override {
+    cli_ =
+        std::make_unique<quic::InteractiveCli>(event_loop, std::move(callback));
+    event_loop_ = event_loop;
+    cli_->PrintLine("Fully connected. Enter '/exit' to exit the chat.\n");
+  }
+
+  void WriteToOutput(absl::string_view user,
+                     absl::string_view message) override {
+    QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized, cli_ == nullptr)
+        << "WriteToOutput called before Initialize";
+    cli_->PrintLine(absl::StrCat(user, ": ", message));
+  }
+
+  void IoLoop() override {
+    QUIC_BUG_IF(quic_bug_moq_chat_user_interface_unitialized,
+                event_loop_ == nullptr)
+        << "IoLoop called before Initialize";
+    event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+  }
+
+ private:
+  quic::QuicEventLoop* event_loop_;
+  std::unique_ptr<quic::InteractiveCli> cli_;
+};
+
 // A client for MoQT over chat, used for interop testing. See
 // https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
 int main(int argc, char* argv[]) {
@@ -39,46 +130,26 @@
   std::string path = url.PathParamsQuery();
   const std::string& username = args[1];
   const std::string& chat_id = args[2];
-  moqt::ChatClient client(
-      server_id, path, username, chat_id,
-      quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
-      quiche::GetQuicheCommandLineFlag(FLAGS_output_file));
+  std::string output_filename =
+      quiche::GetQuicheCommandLineFlag(FLAGS_output_file);
+  std::unique_ptr<moqt::ChatUserInterface> interface;
 
-  while (!client.session_is_open()) {
-    client.RunEventLoop();
+  if (!output_filename.empty()) {
+    interface = std::make_unique<FileOutput>(output_filename, username);
+  } else {  // Use the CLI.
+    interface = std::make_unique<CliOutput>();
   }
+  moqt::ChatClient client(
+      server_id,
+      quiche::GetQuicheCommandLineFlag(FLAGS_disable_certificate_verification),
+      std::move(interface));
 
+  if (!client.Connect(path, username, chat_id)) {
+    return 1;
+  }
   if (!client.AnnounceAndSubscribe()) {
     return 1;
   }
-  while (client.is_syncing()) {
-    client.RunEventLoop();
-  }
-  if (!client.session_is_open()) {
-    return 1;  // Something went wrong in connecting.
-  }
-  if (!client.has_output_file()) {
-    while (client.session_is_open()) {
-      client.RunEventLoop();
-    }
-    return 0;
-  }
-  // There is an output file.
-  std::cout << "Fully connected. Messages are in the output file. Exit the "
-            << "session by entering /exit\n";
-  struct pollfd poll_settings = {
-      0,
-      POLLIN,
-      POLLIN,
-  };
-  while (client.session_is_open()) {
-    std::string message_to_send;
-    while (poll(&poll_settings, 1, 0) <= 0) {
-      client.RunEventLoop();
-    }
-    std::getline(std::cin, message_to_send);
-    client.OnTerminalLineInput(message_to_send);
-    client.WriteToFile(username, message_to_send);
-  }
+  client.IoLoop();
   return 0;
 }
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index f984a71..c764894 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -137,6 +137,12 @@
   }
 }
 
+ChatServer::~ChatServer() {
+  // Kill all sessions so that the callback doesn't fire when the server is
+  // destroyed.
+  server_.quic_server().Shutdown();
+}
+
 void ChatServer::AddUser(absl::string_view username) {
   std::string catalog_data = absl::StrCat("+", username);
   catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 813194e..fab6e2e 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -34,6 +34,7 @@
  public:
   ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
              absl::string_view chat_id, absl::string_view output_file);
+  ~ChatServer();
 
   class RemoteTrackVisitor : public RemoteTrack::Visitor {
    public:
@@ -104,6 +105,7 @@
       [&](absl::string_view path) { return IncomingSessionHandler(path); };
 
   MoqtServer server_;
+  std::list<ChatServerSessionHandler> sessions_;
   MoqChatStrings strings_;
   MoqtKnownTrackPublisher publisher_;
   // Allocator for QuicheBuffer that contains catalog objects.
@@ -111,7 +113,6 @@
   std::shared_ptr<MoqtOutgoingQueue> catalog_;
   RemoteTrackVisitor remote_track_visitor_;
   // indexed by username
-  std::list<ChatServerSessionHandler> sessions_;
   absl::flat_hash_map<std::string, std::shared_ptr<MoqtLiveRelayQueue>>
       user_queues_;
   std::string output_filename_;
diff --git a/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
new file mode 100644
index 0000000..bfa5248
--- /dev/null
+++ b/quiche/quic/moqt/tools/moq_chat_end_to_end_test.cc
@@ -0,0 +1,112 @@
+// Copyright (c) 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/moqt/tools/chat_client.h"
+#include "quiche/quic/moqt/tools/chat_server.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/test_tools/crypto_test_utils.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_callbacks.h"
+#include "quiche/common/quiche_ip_address.h"
+
+namespace moqt {
+
+namespace test {
+
+using ::testing::_;
+
+constexpr std::string kChatHostname = "127.0.0.1";
+
+class MockChatUserInterface : public ChatUserInterface {
+ public:
+  void Initialize(quiche::MultiUseCallback<void(absl::string_view)> callback,
+                  quic::QuicEventLoop* event_loop) override {
+    callback_ = std::move(callback);
+    event_loop_ = event_loop;
+  }
+
+  void IoLoop() override {
+    event_loop_->RunEventLoopOnce(moqt::kChatEventLoopDuration);
+  }
+
+  MOCK_METHOD(void, WriteToOutput,
+              (absl::string_view user, absl::string_view message), (override));
+
+  void SendMessage(absl::string_view message) { callback_(message); }
+
+ private:
+  quiche::MultiUseCallback<void(absl::string_view)> callback_;
+  quic::QuicEventLoop* event_loop_;
+  std::string message_;
+};
+
+class MoqChatEndToEndTest : public quiche::test::QuicheTest {
+ public:
+  MoqChatEndToEndTest()
+      : server_(quic::test::crypto_test_utils::ProofSourceForTesting(),
+                "test_chat", "") {
+    quiche::QuicheIpAddress bind_address;
+    bind_address.FromString(kChatHostname);
+    EXPECT_TRUE(server_.moqt_server().quic_server().CreateUDPSocketAndListen(
+        quic::QuicSocketAddress(bind_address, 0)));
+    auto if1ptr = std::make_unique<MockChatUserInterface>();
+    auto if2ptr = std::make_unique<MockChatUserInterface>();
+    interface1_ = if1ptr.get();
+    interface2_ = if2ptr.get();
+    uint16_t port = server_.moqt_server().quic_server().port();
+    client1_ = std::make_unique<ChatClient>(
+        quic::QuicServerId(kChatHostname, port), true, std::move(if1ptr),
+        server_.moqt_server().quic_server().event_loop());
+    client2_ = std::make_unique<ChatClient>(
+        quic::QuicServerId(kChatHostname, port), true, std::move(if2ptr),
+        server_.moqt_server().quic_server().event_loop());
+  }
+
+  ChatServer server_;
+  MockChatUserInterface *interface1_, *interface2_;
+  std::unique_ptr<ChatClient> client1_, client2_;
+};
+
+TEST_F(MoqChatEndToEndTest, EndToEndTest) {
+  EXPECT_TRUE(client1_->Connect("/moq-chat", "client1", "test_chat"));
+  EXPECT_TRUE(client2_->Connect("/moq-chat", "client2", "test_chat"));
+  EXPECT_TRUE(client1_->AnnounceAndSubscribe());
+  EXPECT_TRUE(client2_->AnnounceAndSubscribe());
+  EXPECT_CALL(*interface2_, WriteToOutput("client1", "Hello")).Times(1);
+  interface1_->SendMessage("Hello");
+  server_.moqt_server().quic_server().WaitForEvents();
+  EXPECT_CALL(*interface1_, WriteToOutput("client2", "Hi")).Times(1);
+  interface2_->SendMessage("Hi");
+  server_.moqt_server().quic_server().WaitForEvents();
+  EXPECT_CALL(*interface2_, WriteToOutput("client1", "How are you?")).Times(1);
+  interface1_->SendMessage("How are you?");
+  server_.moqt_server().quic_server().WaitForEvents();
+  EXPECT_CALL(*interface1_, WriteToOutput("client2", "Good, and you?"))
+      .Times(1);
+  interface2_->SendMessage("Good, and you?");
+  server_.moqt_server().quic_server().WaitForEvents();
+  EXPECT_CALL(*interface2_, WriteToOutput("client1", "I'm fine")).Times(1);
+  interface1_->SendMessage("I'm fine");
+  server_.moqt_server().quic_server().WaitForEvents();
+  EXPECT_CALL(*interface1_, WriteToOutput("client2", "Goodbye")).Times(1);
+  interface2_->SendMessage("Goodbye");
+  server_.moqt_server().quic_server().WaitForEvents();
+  interface1_->SendMessage("/exit");
+  EXPECT_CALL(*interface2_, WriteToOutput(_, _)).Times(0);
+  server_.moqt_server().quic_server().WaitForEvents();
+}
+
+// TODO(martinduke): Add tests for users leaving the chat
+
+}  // namespace test
+
+}  // namespace moqt