QUIC Chat Client for very basic Media-over-QUIC transport testing.

The current state is that the client connects, exchanges SETUP, sends ANNOUNCE and gets ANNOUNCE_OK with an interop server, then SUBSCRIBEs to the catalog track and gets SUBSCRIBE_OK. It receives the catalog OBJECT and will issue SUBSCRIBEs for the tracks there.

If the server subscribes to the track the client announces, it will reply to that subscribe.

There is no capability to send OBJECT, or process non-catalog objects.

PiperOrigin-RevId: 588109420
diff --git a/build/source_list.bzl b/build/source_list.bzl
index ade85e5..ef5bcf4 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1483,8 +1483,11 @@
     "quic/moqt/moqt_messages.h",
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_session.h",
+    "quic/moqt/moqt_subscribe_windows.h",
+    "quic/moqt/moqt_track.h",
     "quic/moqt/test_tools/moqt_test_message.h",
     "quic/moqt/tools/moqt_client.h",
+    "quic/moqt/tools/moqt_mock_visitor.h",
     "quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
@@ -1495,6 +1498,10 @@
     "quic/moqt/moqt_parser.cc",
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_session.cc",
+    "quic/moqt/moqt_session_test.cc",
+    "quic/moqt/moqt_subscribe_windows_test.cc",
+    "quic/moqt/moqt_track_test.cc",
+    "quic/moqt/tools/chat_client_bin.cc",
     "quic/moqt/tools/moqt_client.cc",
     "quic/moqt/tools/moqt_end_to_end_test.cc",
     "quic/moqt/tools/moqt_server.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index c514ab2..7695f80 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1487,8 +1487,11 @@
     "src/quiche/quic/moqt/moqt_messages.h",
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_session.h",
+    "src/quiche/quic/moqt/moqt_subscribe_windows.h",
+    "src/quiche/quic/moqt/moqt_track.h",
     "src/quiche/quic/moqt/test_tools/moqt_test_message.h",
     "src/quiche/quic/moqt/tools/moqt_client.h",
+    "src/quiche/quic/moqt/tools/moqt_mock_visitor.h",
     "src/quiche/quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
@@ -1499,6 +1502,10 @@
     "src/quiche/quic/moqt/moqt_parser.cc",
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
+    "src/quiche/quic/moqt/moqt_session_test.cc",
+    "src/quiche/quic/moqt/moqt_subscribe_windows_test.cc",
+    "src/quiche/quic/moqt/moqt_track_test.cc",
+    "src/quiche/quic/moqt/tools/chat_client_bin.cc",
     "src/quiche/quic/moqt/tools/moqt_client.cc",
     "src/quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
     "src/quiche/quic/moqt/tools/moqt_server.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 68779fa..baee204 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1486,8 +1486,11 @@
     "quiche/quic/moqt/moqt_messages.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_session.h",
+    "quiche/quic/moqt/moqt_subscribe_windows.h",
+    "quiche/quic/moqt/moqt_track.h",
     "quiche/quic/moqt/test_tools/moqt_test_message.h",
     "quiche/quic/moqt/tools/moqt_client.h",
+    "quiche/quic/moqt/tools/moqt_mock_visitor.h",
     "quiche/quic/moqt/tools/moqt_server.h"
   ],
   "moqt_srcs": [
@@ -1498,6 +1501,10 @@
     "quiche/quic/moqt/moqt_parser.cc",
     "quiche/quic/moqt/moqt_parser_test.cc",
     "quiche/quic/moqt/moqt_session.cc",
+    "quiche/quic/moqt/moqt_session_test.cc",
+    "quiche/quic/moqt/moqt_subscribe_windows_test.cc",
+    "quiche/quic/moqt/moqt_track_test.cc",
+    "quiche/quic/moqt/tools/chat_client_bin.cc",
     "quiche/quic/moqt/tools/moqt_client.cc",
     "quiche/quic/moqt/tools/moqt_end_to_end_test.cc",
     "quiche/quic/moqt/tools/moqt_server.cc"
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 8efefec..c1bc980 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -2,7 +2,9 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
+#include <cstdint>
 #include <memory>
+#include <optional>
 #include <string>
 
 #include "absl/strings/string_view.h"
@@ -15,30 +17,21 @@
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/test_tools/crypto_test_utils.h"
 #include "quiche/quic/test_tools/simulator/simulator.h"
 #include "quiche/quic/test_tools/simulator/test_harness.h"
 #include "quiche/common/platform/api/quiche_test.h"
 
 namespace moqt::test {
+
 namespace {
 
 using ::quic::simulator::Simulator;
 using ::testing::_;
 using ::testing::Assign;
 
-struct MockSessionCallbacks {
-  testing::MockFunction<void()> session_established_callback;
-  testing::MockFunction<void(absl::string_view)> session_terminated_callback;
-  testing::MockFunction<void()> session_deleted_callback;
-
-  MoqtSessionCallbacks AsSessionCallbacks() {
-    return MoqtSessionCallbacks{session_established_callback.AsStdFunction(),
-                                session_terminated_callback.AsStdFunction(),
-                                session_deleted_callback.AsStdFunction()};
-  }
-};
-
 class ClientEndpoint : public quic::simulator::QuicEndpointWithConnection {
  public:
   ClientEndpoint(Simulator* simulator, const std::string& name,
@@ -55,7 +48,8 @@
             &quic_session_,
             MoqtSessionParameters{.version = version,
                                   .perspective = quic::Perspective::IS_CLIENT,
-                                  .using_webtrans = false},
+                                  .using_webtrans = false,
+                                  .deliver_partial_objects = false},
             callbacks_.AsSessionCallbacks()) {
     quic_session_.Initialize();
   }
@@ -97,7 +91,8 @@
             &quic_session_,
             MoqtSessionParameters{.version = version,
                                   .perspective = quic::Perspective::IS_SERVER,
-                                  .using_webtrans = false},
+                                  .using_webtrans = false,
+                                  .deliver_partial_objects = false},
             callbacks_.AsSessionCallbacks()) {
     quic_session_.Initialize();
   }
@@ -131,6 +126,22 @@
 
   void WireUpEndpoints() { test_harness_.WireUpEndpoints(); }
 
+  void EstablishSession() {
+    CreateDefaultEndpoints();
+    WireUpEndpoints();
+
+    client_->quic_session()->CryptoConnect();
+    bool client_established = false;
+    bool server_established = false;
+    EXPECT_CALL(client_->established_callback(), Call())
+        .WillOnce(Assign(&client_established, true));
+    EXPECT_CALL(server_->established_callback(), Call())
+        .WillOnce(Assign(&server_established, true));
+    bool success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return client_established && server_established; });
+    QUICHE_CHECK(success);
+  }
+
  protected:
   quic::simulator::TestHarness test_harness_;
 
@@ -178,5 +189,95 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, AnnounceExchange) {
+  EstablishSession();
+  testing::MockFunction<void(absl::string_view track_namespace,
+                             std::optional<absl::string_view> error_message)>
+      announce_callback;
+  client_->session()->Announce("foo", announce_callback.AsStdFunction());
+  bool matches = false;
+  EXPECT_CALL(announce_callback, Call(_, _))
+      .WillOnce([&](absl::string_view track_namespace,
+                    std::optional<absl::string_view> error_message) {
+        matches = true;
+        EXPECT_EQ(track_namespace, "foo");
+        EXPECT_FALSE(error_message.has_value());
+      });
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; });
+  EXPECT_TRUE(success);
+}
+
+TEST_F(MoqtIntegrationTest, SubscribeAbsoluteOk) {
+  EstablishSession();
+  FullTrackName full_track_name("foo", "bar");
+  MockLocalTrackVisitor server_visitor;
+  MockRemoteTrackVisitor client_visitor;
+  server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+  std::optional<absl::string_view> expected_reason = std::nullopt;
+  bool received_ok = false;
+  EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
+      .WillOnce([&]() { received_ok = true; });
+  client_->session()->SubscribeAbsolute(full_track_name.track_namespace,
+                                        full_track_name.track_name, 0, 0,
+                                        &client_visitor);
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+  EXPECT_TRUE(success);
+}
+
+TEST_F(MoqtIntegrationTest, SubscribeRelativeOk) {
+  EstablishSession();
+  FullTrackName full_track_name("foo", "bar");
+  MockLocalTrackVisitor server_visitor;
+  MockRemoteTrackVisitor client_visitor;
+  server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+  std::optional<absl::string_view> expected_reason = std::nullopt;
+  bool received_ok = false;
+  EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
+      .WillOnce([&]() { received_ok = true; });
+  client_->session()->SubscribeRelative(full_track_name.track_namespace,
+                                        full_track_name.track_name, 10, 10,
+                                        &client_visitor);
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+  EXPECT_TRUE(success);
+}
+
+TEST_F(MoqtIntegrationTest, SubscribeCurrentGroupOk) {
+  EstablishSession();
+  FullTrackName full_track_name("foo", "bar");
+  MockLocalTrackVisitor server_visitor;
+  MockRemoteTrackVisitor client_visitor;
+  server_->session()->AddLocalTrack(full_track_name, &server_visitor);
+  std::optional<absl::string_view> expected_reason = std::nullopt;
+  bool received_ok = false;
+  EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
+      .WillOnce([&]() { received_ok = true; });
+  client_->session()->SubscribeCurrentGroup(full_track_name.track_namespace,
+                                            full_track_name.track_name,
+                                            &client_visitor);
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+  EXPECT_TRUE(success);
+}
+
+TEST_F(MoqtIntegrationTest, SubscribeError) {
+  EstablishSession();
+  FullTrackName full_track_name("foo", "bar");
+  MockRemoteTrackVisitor client_visitor;
+  std::optional<absl::string_view> expected_reason = "Track does not exist";
+  bool received_ok = false;
+  EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
+      .WillOnce([&]() { received_ok = true; });
+  client_->session()->SubscribeRelative(full_track_name.track_namespace,
+                                        full_track_name.track_name, 10, 10,
+                                        &client_visitor);
+  bool success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
+  EXPECT_TRUE(success);
+}
+
 }  // namespace
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 3586a22..773907b 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -11,6 +11,7 @@
 #include <cstdint>
 #include <optional>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "absl/strings/string_view.h"
@@ -36,6 +37,7 @@
   quic::Perspective perspective;
   bool using_webtrans;
   std::string path;
+  bool deliver_partial_objects;
 };
 
 // The maximum length of a message, excluding any OBJECT payload. This prevents
@@ -79,6 +81,47 @@
   kAuthorizationInfo = 0x2,
 };
 
+struct FullTrackName {
+  std::string track_namespace;
+  std::string track_name;
+  FullTrackName(std::string ns, std::string name)
+      : track_namespace(std::move(ns)), track_name(std::move(name)) {}
+  bool operator==(const FullTrackName& other) const {
+    return track_namespace == other.track_namespace &&
+           track_name == other.track_name;
+  }
+  bool operator<(const FullTrackName& other) const {
+    return track_namespace < other.track_namespace ||
+           (track_namespace == other.track_namespace &&
+            track_name < other.track_name);
+  }
+  FullTrackName& operator=(FullTrackName other) {
+    track_namespace = other.track_namespace;
+    track_name = other.track_name;
+    return *this;
+  }
+  template <typename H>
+  friend H AbslHashValue(H h, const FullTrackName& m);
+};
+
+// These are absolute sequence numbers.
+struct FullSequence {
+  uint64_t group = 0;
+  uint64_t object = 0;
+  bool operator==(const FullSequence& other) const {
+    return group == other.group && object == other.object;
+  }
+  bool operator<(const FullSequence& other) const {
+    return group < other.group ||
+           (group == other.group && object < other.object);
+  }
+};
+
+template <typename H>
+H AbslHashValue(H h, const FullTrackName& m) {
+  return H::combine(std::move(h), m.track_namespace, m.track_name);
+}
+
 struct QUICHE_EXPORT MoqtClientSetup {
   std::vector<MoqtVersion> supported_versions;
   std::optional<MoqtRole> role;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 9530082..164a709 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -4,7 +4,10 @@
 
 #include "quiche/quic/moqt/moqt_session.h"
 
+#include <cstdint>
 #include <memory>
+#include <optional>
+#include <queue>
 #include <string>
 #include <utility>
 #include <vector>
@@ -14,6 +17,8 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
@@ -26,6 +31,11 @@
 
 using ::quic::Perspective;
 
+constexpr uint64_t kMoqtErrorTrackDoesntExist = 1;
+constexpr uint64_t kMoqtErrorObjectDoesntExist = 2;
+
+constexpr int kMaxBufferedObjects = 1000;
+
 void MoqtSession::OnSessionReady() {
   QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session ready";
   if (parameters_.perspective == Perspective::IS_SERVER) {
@@ -54,7 +64,19 @@
     Error("Failed to write client SETUP message");
     return;
   }
-  QUICHE_DLOG(INFO) << ENDPOINT << "Send the SETUP message";
+  QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message";
+}
+
+void MoqtSession::OnSessionClosed(webtransport::SessionErrorCode,
+                                  const std::string& error_message) {
+  if (!error_.empty()) {
+    // Avoid erroring out twice.
+    return;
+  }
+  QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session closed with message: "
+                    << error_message;
+  error_ = error_message;
+  std::move(session_terminated_callback_)(error_message);
 }
 
 void MoqtSession::OnIncomingBidirectionalStreamAvailable() {
@@ -72,18 +94,6 @@
   }
 }
 
-void MoqtSession::OnSessionClosed(webtransport::SessionErrorCode,
-                                  const std::string& error_message) {
-  if (!error_.empty()) {
-    // Avoid erroring out twice.
-    return;
-  }
-  QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session closed with message: "
-                    << error_message;
-  error_ = error_message;
-  std::move(session_terminated_callback_)(error_message);
-}
-
 void MoqtSession::Error(absl::string_view error) {
   if (!error_.empty()) {
     // Avoid erroring out twice.
@@ -97,6 +107,138 @@
   std::move(session_terminated_callback_)(error);
 }
 
+void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
+                                LocalTrack::Visitor* visitor) {
+  local_tracks_.try_emplace(full_track_name, full_track_name,
+                            next_track_alias_++, visitor);
+}
+
+// TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
+// trigger session errors.
+void MoqtSession::Announce(absl::string_view track_namespace,
+                           MoqtAnnounceCallback announce_callback) {
+  if (pending_outgoing_announces_.contains(track_namespace)) {
+    std::move(announce_callback)(
+        track_namespace, "ANNOUNCE message already outstanding for namespace");
+    return;
+  }
+  MoqtAnnounce message;
+  message.track_namespace = track_namespace;
+  bool success = session_->GetStreamById(*control_stream_)
+                     ->Write(framer_.SerializeAnnounce(message).AsStringView());
+  if (!success) {
+    Error("Failed to write ANNOUNCE message");
+    return;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for "
+                  << message.track_namespace;
+  pending_outgoing_announces_[track_namespace] = std::move(announce_callback);
+}
+
+bool MoqtSession::HasSubscribers(const FullTrackName& full_track_name) const {
+  auto it = local_tracks_.find(full_track_name);
+  return (it != local_tracks_.end() && it->second.HasSubscriber());
+}
+
+bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace,
+                                    absl::string_view name,
+                                    uint64_t start_group, uint64_t start_object,
+                                    RemoteTrack::Visitor* visitor,
+                                    absl::string_view auth_info) {
+  MoqtSubscribeRequest message;
+  message.track_namespace = track_namespace;
+  message.track_name = name;
+  message.start_group = MoqtSubscribeLocation(true, start_group);
+  message.start_object = MoqtSubscribeLocation(true, start_object);
+  message.end_group = std::nullopt;
+  message.end_object = std::nullopt;
+  if (!auth_info.empty()) {
+    message.authorization_info = std::move(auth_info);
+  }
+  return Subscribe(message, visitor);
+}
+
+bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace,
+                                    absl::string_view name,
+                                    uint64_t start_group, uint64_t start_object,
+                                    uint64_t end_group, uint64_t end_object,
+                                    RemoteTrack::Visitor* visitor,
+                                    absl::string_view auth_info) {
+  if (end_group < start_group) {
+    QUIC_DLOG(ERROR) << "Subscription end is before beginning";
+    return false;
+  }
+  if (end_group == start_group && end_object < start_object) {
+    QUIC_DLOG(ERROR) << "Subscription end is before beginning";
+    return false;
+  }
+  MoqtSubscribeRequest message;
+  message.track_namespace = track_namespace;
+  message.track_name = name;
+  message.start_group = MoqtSubscribeLocation(true, start_group);
+  message.start_object = MoqtSubscribeLocation(true, start_object);
+  message.end_group = MoqtSubscribeLocation(true, end_group);
+  message.end_object = MoqtSubscribeLocation(true, end_object);
+  if (!auth_info.empty()) {
+    message.authorization_info = std::move(auth_info);
+  }
+  return Subscribe(message, visitor);
+}
+
+bool MoqtSession::SubscribeRelative(absl::string_view track_namespace,
+                                    absl::string_view name, int64_t start_group,
+                                    int64_t start_object,
+                                    RemoteTrack::Visitor* visitor,
+                                    absl::string_view auth_info) {
+  MoqtSubscribeRequest message;
+  message.track_namespace = track_namespace;
+  message.track_name = name;
+  message.start_group = MoqtSubscribeLocation(false, start_group);
+  message.start_object = MoqtSubscribeLocation(false, start_object);
+  message.end_group = std::nullopt;
+  message.end_object = std::nullopt;
+  if (!auth_info.empty()) {
+    message.authorization_info = std::move(auth_info);
+  }
+  return Subscribe(message, visitor);
+}
+
+bool MoqtSession::SubscribeCurrentGroup(absl::string_view track_namespace,
+                                        absl::string_view name,
+                                        RemoteTrack::Visitor* visitor,
+                                        absl::string_view auth_info) {
+  MoqtSubscribeRequest message;
+  message.track_namespace = track_namespace;
+  message.track_name = name;
+  // First object of current group.
+  message.start_group = MoqtSubscribeLocation(false, (uint64_t)0);
+  message.start_object = MoqtSubscribeLocation(true, (int64_t)0);
+  message.end_group = std::nullopt;
+  message.end_object = std::nullopt;
+  if (!auth_info.empty()) {
+    message.authorization_info = std::move(auth_info);
+  }
+  return Subscribe(message, visitor);
+}
+
+bool MoqtSession::Subscribe(const MoqtSubscribeRequest& message,
+                            RemoteTrack::Visitor* visitor) {
+  // TODO(martinduke): support authorization info
+  bool success =
+      session_->GetStreamById(*control_stream_)
+          ->Write(framer_.SerializeSubscribeRequest(message).AsStringView());
+  if (!success) {
+    Error("Failed to write SUBSCRIBE_REQUEST message");
+    return false;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for "
+                  << message.track_namespace << ":" << message.track_name;
+  FullTrackName ftn(std::string(message.track_namespace),
+                    std::string(message.track_name));
+  remote_tracks_.try_emplace(ftn, ftn, visitor);
+  return true;
+}
+
 void MoqtSession::Stream::OnCanRead() {
   bool fin =
       quiche::ProcessAllReadableRegions(*stream_, [&](absl::string_view chunk) {
@@ -122,6 +264,63 @@
   }
 }
 
+void MoqtSession::Stream::OnObjectMessage(const MoqtObject& message,
+                                          absl::string_view payload,
+                                          bool end_of_message) {
+  if (is_control_stream_ == true) {
+    session_->Error("Received OBJECT message on control stream");
+    return;
+  }
+  QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message on stream "
+                    << stream_->GetStreamId() << " for track alias "
+                    << message.track_id << " with sequence "
+                    << message.group_sequence << ":" << message.object_sequence
+                    << " length " << payload.size() << " explicit length "
+                    << (message.payload_length.has_value()
+                            ? (int)*message.payload_length
+                            : -1)
+                    << (end_of_message ? "F" : "");
+  if (!session_->parameters_.deliver_partial_objects) {
+    if (!end_of_message) {  // Buffer partial object.
+      absl::StrAppend(&partial_object_, payload);
+      return;
+    }
+    if (!partial_object_.empty()) {  // Completes the object
+      absl::StrAppend(&partial_object_, payload);
+      payload = absl::string_view(partial_object_);
+    }
+  }
+  auto it = session_->tracks_by_alias_.find(message.track_id);
+  if (it == session_->tracks_by_alias_.end()) {
+    // No SUBSCRIBE_OK received with this alias, buffer it.
+    auto it2 = session_->object_queue_.find(message.track_id);
+    std::vector<BufferedObject>* queue;
+    if (it2 == session_->object_queue_.end()) {
+      queue = &session_->object_queue_[message.track_id];
+    } else {
+      queue = &it2->second;
+    }
+    if (session_->num_buffered_objects_ >= kMaxBufferedObjects) {
+      session_->num_buffered_objects_++;
+      session_->Error("Too many buffered objects");
+      return;
+    }
+    queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload,
+                                    end_of_message));
+    QUIC_DLOG(INFO) << ENDPOINT << "Buffering OBJECT for track alias "
+                    << message.track_id;
+    return;
+  }
+  RemoteTrack* subscription = it->second;
+  if (subscription->visitor() != nullptr) {
+    subscription->visitor()->OnObjectFragment(
+        subscription->full_track_name(), stream_->GetStreamId(),
+        message.group_sequence, message.object_sequence,
+        message.object_send_order, payload, end_of_message);
+  }
+  partial_object_.clear();
+}
+
 void MoqtSession::Stream::OnClientSetupMessage(const MoqtClientSetup& message) {
   if (is_control_stream_.has_value()) {
     if (!*is_control_stream_) {
@@ -152,7 +351,7 @@
       session_->Error("Failed to write server SETUP message");
       return;
     }
-    QUICHE_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
+    QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message";
   }
   // TODO: handle role and path.
   std::move(session_->session_established_callback_)();
@@ -176,13 +375,230 @@
                                  absl::Hex(session_->parameters_.version)));
     return;
   }
-  QUICHE_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
+  QUIC_DLOG(INFO) << ENDPOINT << "Received the SETUP message";
   // TODO: handle role and path.
   std::move(session_->session_established_callback_)();
 }
 
+void MoqtSession::Stream::SendSubscribeError(
+    const MoqtSubscribeRequest& message, uint64_t error_code,
+    absl::string_view reason_phrase) {
+  MoqtSubscribeError subscribe_error;
+  subscribe_error.track_namespace = message.track_namespace;
+  subscribe_error.track_name = message.track_name;
+  subscribe_error.error_code = error_code;
+  subscribe_error.reason_phrase = reason_phrase;
+  bool success =
+      stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error)
+                         .AsStringView());
+  if (!success) {
+    session_->Error("Failed to write SUBSCRIBE_ERROR message");
+  }
+}
+
+void MoqtSession::Stream::OnSubscribeRequestMessage(
+    const MoqtSubscribeRequest& message) {
+  std::string reason_phrase = "";
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_REQUEST for "
+                  << message.track_namespace << ":" << message.track_name;
+  auto it = session_->local_tracks_.find(FullTrackName(
+      std::string(message.track_namespace), std::string(message.track_name)));
+  if (it == session_->local_tracks_.end()) {
+    QUIC_DLOG(INFO) << ENDPOINT << "Rejected because "
+                    << message.track_namespace << ":" << message.track_name
+                    << " does not exist";
+    SendSubscribeError(message, kMoqtErrorTrackDoesntExist,
+                       "Track does not exist");
+    return;
+  }
+  LocalTrack& track = it->second;
+  std::optional<FullSequence> start = session_->LocationToAbsoluteNumber(
+      track, message.start_group, message.start_object);
+  QUICHE_DCHECK(start.has_value());  // Parser enforces this.
+  std::optional<FullSequence> end = session_->LocationToAbsoluteNumber(
+      track, message.end_group, message.end_object);
+  if (start < track.next_sequence() && track.visitor() != nullptr) {
+    SubscribeWindow window = end.has_value()
+                                 ? SubscribeWindow(start->group, start->object,
+                                                   end->group, end->object)
+                                 : SubscribeWindow(start->group, start->object);
+    std::optional<absl::string_view> past_objects_available =
+        track.visitor()->OnSubscribeRequestForPast(window);
+    if (!past_objects_available.has_value()) {
+      SendSubscribeError(message, kMoqtErrorObjectDoesntExist,
+                         "Object does not exist");
+      return;
+    }
+  }
+  MoqtSubscribeOk subscribe_ok;
+  subscribe_ok.track_namespace = message.track_namespace;
+  subscribe_ok.track_name = message.track_name;
+  subscribe_ok.track_id = track.track_alias();
+  bool success = stream_->Write(
+      session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView());
+  if (!success) {
+    session_->Error("Failed to write SUBSCRIBE_OK message");
+    return;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
+                  << message.track_namespace << ":" << message.track_name;
+  if (!end.has_value()) {
+    track.AddWindow(SubscribeWindow(start->group, start->object));
+    return;
+  }
+  track.AddWindow(
+      SubscribeWindow(start->group, start->object, end->group, end->object));
+}
+
+void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  if (session_->tracks_by_alias_.contains(message.track_id)) {
+    session_->Error("Received duplicate track_alias");
+    return;
+  }
+  auto it = session_->remote_tracks_.find(FullTrackName(
+      std::string(message.track_namespace), std::string(message.track_name)));
+  if (it == session_->remote_tracks_.end()) {
+    session_->Error("Received SUBSCRIBE_OK for nonexistent subscribe");
+    return;
+  }
+  // Note that if there are multiple SUBSCRIBE_OK for the same track,
+  // RemoteTrack.track_alias() will be the last alias received, but
+  // tracks_by_alias_ will have an entry for every track_alias received.
+  // TODO: revise this data structure to make it easier to clean up
+  // RemoteTracks, unless draft changes make it irrelevant.
+  QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
+                  << message.track_namespace << ":" << message.track_name
+                  << ", track_alias = " << message.track_id;
+  RemoteTrack& track = it->second;
+  track.set_track_alias(message.track_id);
+  session_->tracks_by_alias_[message.track_id] = &track;
+  // TODO: handle expires.
+  if (track.visitor() != nullptr) {
+    track.visitor()->OnReply(track.full_track_name(), std::nullopt);
+  }
+  // Clear the buffer for this track alias.
+  auto it2 = session_->object_queue_.find(message.track_id);
+  if (it2 == session_->object_queue_.end() || track.visitor() == nullptr) {
+    // Nothing is buffered, or the app hasn't registered a visitor anyway.
+    return;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Processing buffered OBJECTs for track_alias "
+                  << message.track_id;
+  std::vector<BufferedObject>& queue = it2->second;
+  for (BufferedObject& to_deliver : queue) {
+    track.visitor()->OnObjectFragment(
+        track.full_track_name(), to_deliver.stream_id,
+        to_deliver.message.group_sequence, to_deliver.message.object_sequence,
+        to_deliver.message.object_send_order, to_deliver.payload,
+        to_deliver.eom);
+    session_->num_buffered_objects_--;
+  }
+  session_->object_queue_.erase(it2);
+}
+
+void MoqtSession::Stream::OnSubscribeErrorMessage(
+    const MoqtSubscribeError& message) {
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  auto it = session_->remote_tracks_.find(FullTrackName(
+      std::string(message.track_namespace), std::string(message.track_name)));
+  if (it == session_->remote_tracks_.end()) {
+    session_->Error("Received SUBSCRIBE_ERROR for nonexistent subscribe");
+    return;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
+                  << message.track_namespace << ":" << message.track_name
+                  << ", error = " << message.reason_phrase;
+  if (it->second.visitor() != nullptr) {
+    it->second.visitor()->OnReply(it->second.full_track_name(),
+                                  message.reason_phrase);
+  }
+}
+
+void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  MoqtAnnounceOk ok;
+  ok.track_namespace = message.track_namespace;
+  bool success =
+      stream_->Write(session_->framer_.SerializeAnnounceOk(ok).AsStringView());
+  if (!success) {
+    session_->Error("Failed to write ANNOUNCE_OK message");
+    return;
+  }
+}
+
+void MoqtSession::Stream::OnAnnounceOkMessage(const MoqtAnnounceOk& message) {
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
+  if (it == session_->pending_outgoing_announces_.end()) {
+    session_->Error("Received ANNOUNCE_OK for nonexistent announce");
+    return;
+  }
+  std::move(it->second)(message.track_namespace, std::nullopt);
+  session_->pending_outgoing_announces_.erase(it);
+}
+
+void MoqtSession::Stream::OnAnnounceErrorMessage(
+    const MoqtAnnounceError& message) {
+  if (!CheckIfIsControlStream()) {
+    return;
+  }
+  auto it = session_->pending_outgoing_announces_.find(message.track_namespace);
+  if (it == session_->pending_outgoing_announces_.end()) {
+    session_->Error("Received ANNOUNCE_ERROR for nonexistent announce");
+    return;
+  }
+  std::move(it->second)(message.track_namespace, message.reason_phrase);
+  session_->pending_outgoing_announces_.erase(it);
+}
+
 void MoqtSession::Stream::OnParsingError(absl::string_view reason) {
   session_->Error(absl::StrCat("Parse error: ", reason));
 }
 
+bool MoqtSession::Stream::CheckIfIsControlStream() {
+  if (!is_control_stream_.has_value()) {
+    session_->Error("Received SUBSCRIBE_REQUEST as first message");
+    return false;
+  }
+  if (!*is_control_stream_) {
+    session_->Error("Received SUBSCRIBE_REQUEST on non-control stream");
+    return false;
+  }
+  return true;
+}
+
+std::optional<FullSequence> MoqtSession::LocationToAbsoluteNumber(
+    const LocalTrack& track, const std::optional<MoqtSubscribeLocation>& group,
+    const std::optional<MoqtSubscribeLocation>& object) {
+  FullSequence sequence;
+  if (!group.has_value() || !object.has_value()) {
+    return std::nullopt;
+  }
+  if (group->absolute) {
+    sequence.group = group->absolute_value;
+  } else {
+    sequence.group = track.next_sequence().group + group->relative_value;
+  }
+  if (object->absolute) {
+    sequence.object = object->absolute_value;
+  } else {
+    // Subtract 1 because the relative value is computed from the largest sent
+    // sequence number, not the next one.
+    sequence.object = track.next_sequence().object + object->relative_value - 1;
+  }
+  return sequence;
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index a13c14d..89b3def 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -5,15 +5,21 @@
 #ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
 #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
 
+#include <cstdint>
 #include <optional>
 #include <string>
 #include <utility>
+#include <vector>
 
+#include "absl/container/flat_hash_map.h"
+#include "absl/container/node_hash_map.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/simple_buffer_allocator.h"
@@ -21,10 +27,18 @@
 
 namespace moqt {
 
+namespace test {
+class MoqtSessionPeer;
+}
+
 using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>;
 using MoqtSessionTerminatedCallback =
     quiche::SingleUseCallback<void(absl::string_view error_message)>;
 using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>;
+// If |error_message| is nullopt, the ANNOUNCE was successful.
+using MoqtAnnounceCallback = quiche::SingleUseCallback<void(
+    absl::string_view track_namespace,
+    std::optional<absl::string_view> error_message)>;
 
 // Callbacks for session-level events.
 struct MoqtSessionCallbacks {
@@ -56,7 +70,7 @@
                        const std::string&) override;
   void OnIncomingBidirectionalStreamAvailable() override;
   void OnIncomingUnidirectionalStreamAvailable() override;
-  void OnDatagramReceived(absl::string_view datagram) override {}
+  void OnDatagramReceived(absl::string_view /*datagram*/) override {}
   void OnCanCreateNewOutgoingBidirectionalStream() override {}
   void OnCanCreateNewOutgoingUnidirectionalStream() override {}
 
@@ -64,7 +78,42 @@
 
   quic::Perspective perspective() const { return parameters_.perspective; }
 
+  // Add to the list of tracks that can be subscribed to. Call this before
+  // Announce() so that subscriptions can be processed correctly. If |visitor|
+  // is nullptr, then incoming SUBSCRIBE_REQUEST for objects in the path will
+  // receive SUBSCRIBE_OK, but never actually get the objects.
+  void AddLocalTrack(const FullTrackName& full_track_name,
+                     LocalTrack::Visitor* visitor);
+  // Send an ANNOUNCE message for |track_namespace|, and call
+  // |announce_callback| when the response arrives. Will fail immediately if
+  // there is already an unresolved ANNOUNCE for that namespace.
+  void Announce(absl::string_view track_namespace,
+                MoqtAnnounceCallback announce_callback);
+  bool HasSubscribers(const FullTrackName& full_track_name) const;
+
+  // Returns true if SUBSCRIBE_REQUEST was sent. If there is already a
+  // subscription to the track, the message will still be sent. However, the
+  // visitor will be ignored.
+  bool SubscribeAbsolute(absl::string_view track_namespace,
+                         absl::string_view name, uint64_t start_group,
+                         uint64_t start_object, RemoteTrack::Visitor* visitor,
+                         absl::string_view auth_info = "");
+  bool SubscribeAbsolute(absl::string_view track_namespace,
+                         absl::string_view name, uint64_t start_group,
+                         uint64_t start_object, uint64_t end_group,
+                         uint64_t end_object, RemoteTrack::Visitor* visitor,
+                         absl::string_view auth_info = "");
+  bool SubscribeRelative(absl::string_view track_namespace,
+                         absl::string_view name, int64_t start_group,
+                         int64_t start_object, RemoteTrack::Visitor* visitor,
+                         absl::string_view auth_info = "");
+  bool SubscribeCurrentGroup(absl::string_view track_namespace,
+                             absl::string_view name,
+                             RemoteTrack::Visitor* visitor,
+                             absl::string_view auth_info = "");
+
  private:
+  friend class test::MoqtSessionPeer;
   class QUICHE_EXPORT Stream : public webtransport::StreamVisitor,
                                public MoqtParserVisitor {
    public:
@@ -88,21 +137,21 @@
 
     // MoqtParserVisitor implementation.
     void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
-                         bool end_of_message) override {}
+                         bool end_of_message) override;
     void OnClientSetupMessage(const MoqtClientSetup& message) override;
     void OnServerSetupMessage(const MoqtServerSetup& message) override;
     void OnSubscribeRequestMessage(
-        const MoqtSubscribeRequest& message) override {}
-    void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override {}
-    void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override {}
-    void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override {}
-    void OnSubscribeFinMessage(const MoqtSubscribeFin& message) override {}
-    void OnSubscribeRstMessage(const MoqtSubscribeRst& message) override {}
-    void OnAnnounceMessage(const MoqtAnnounce& message) override {}
-    void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override {}
-    void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override {}
-    void OnUnannounceMessage(const MoqtUnannounce& message) override {}
-    void OnGoAwayMessage(const MoqtGoAway& message) override {}
+        const MoqtSubscribeRequest& message) override;
+    void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
+    void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
+    void OnUnsubscribeMessage(const MoqtUnsubscribe& /*message*/) override {}
+    void OnSubscribeFinMessage(const MoqtSubscribeFin& /*message*/) override {}
+    void OnSubscribeRstMessage(const MoqtSubscribeRst& /*message*/) override {}
+    void OnAnnounceMessage(const MoqtAnnounce& message) override;
+    void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override;
+    void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override;
+    void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {}
+    void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {}
     void OnParsingError(absl::string_view reason) override;
 
     quic::Perspective perspective() const {
@@ -110,14 +159,46 @@
     }
 
    private:
+    friend class test::MoqtSessionPeer;
+    void SendSubscribeError(const MoqtSubscribeRequest& message,
+                            uint64_t error_code,
+                            absl::string_view reason_phrase);
+    bool CheckIfIsControlStream();
+
     MoqtSession* session_;
     webtransport::Stream* stream_;
     MoqtParser parser_;
     // nullopt means "incoming stream, and we don't know if it's the control
     // stream or a data stream yet".
     std::optional<bool> is_control_stream_;
+    std::string partial_object_;
   };
 
+  // If parameters_.deliver_partial_objects is false, then the session buffers
+  // these objects until they arrive in their entirety. This stores the
+  // relevant information to later deliver this object via OnObject().
+  struct BufferedObject {
+    uint32_t stream_id;
+    MoqtObject message;
+    std::string payload;
+    bool eom;
+    BufferedObject(uint32_t id, const MoqtObject& header,
+                   absl::string_view body, bool end_of_message)
+        : stream_id(id),
+          message(header),
+          payload(std::string(body)),
+          eom(end_of_message) {}
+  };
+
+  // Returns false if the SUBSCRIBE_REQUEST isn't sent.
+  bool Subscribe(const MoqtSubscribeRequest& message,
+                 RemoteTrack::Visitor* visitor);
+  // converts two MoqtLocations into absolute sequences.
+  std::optional<FullSequence> LocationToAbsoluteNumber(
+      const LocalTrack& track,
+      const std::optional<MoqtSubscribeLocation>& group,
+      const std::optional<MoqtSubscribeLocation>& object);
+
   webtransport::Session* session_;
   MoqtSessionParameters parameters_;
   MoqtSessionEstablishedCallback session_established_callback_;
@@ -127,6 +208,23 @@
 
   std::optional<webtransport::StreamId> control_stream_;
   std::string error_;
+
+  // All the tracks the session is subscribed to. Multiple subscribes to the
+  // same track are recorded in a single subscription.
+  absl::node_hash_map<FullTrackName, RemoteTrack> remote_tracks_;
+  // All the tracks the peer can subscribe to.
+  absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
+
+  // Remote tracks indexed by TrackId. Must be active.
+  absl::flat_hash_map<uint64_t, RemoteTrack*> tracks_by_alias_;
+  uint64_t next_track_alias_ = 0;
+  // Buffer for OBJECTs that arrive with an unknown track alias.
+  absl::flat_hash_map<uint64_t, std::vector<BufferedObject>> object_queue_;
+  int num_buffered_objects_ = 0;
+
+  // Indexed by track namespace.
+  absl::flat_hash_map<std::string, MoqtAnnounceCallback>
+      pending_outgoing_announces_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
new file mode 100644
index 0000000..12f9c73
--- /dev/null
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -0,0 +1,588 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_session.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_data_reader.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
+#include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/common/quiche_stream.h"
+#include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+namespace test {
+
+namespace {
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::StrictMock;
+
+constexpr webtransport::StreamId kControlStreamId = 4;
+constexpr webtransport::StreamId kIncomingUniStreamId = 15;
+
+constexpr MoqtSessionParameters default_parameters = {
+    /*version=*/MoqtVersion::kDraft01,
+    /*perspective=*/quic::Perspective::IS_CLIENT,
+    /*using_webtrans=*/true,
+    /*path=*/std::string(),
+    /*deliver_partial_objects=*/false,
+};
+
+// Returns nullopt if there is not enough in |message| to extract a type
+static std::optional<MoqtMessageType> ExtractMessageType(
+    const absl::string_view message) {
+  quic::QuicDataReader reader(message);
+  uint64_t value;
+  if (!reader.ReadVarInt62(&value)) {
+    return std::nullopt;
+  }
+  return static_cast<MoqtMessageType>(value);
+}
+
+}  // namespace
+
+class MoqtSessionPeer {
+ public:
+  static std::unique_ptr<MoqtParserVisitor> CreateControlStream(
+      MoqtSession* session, webtransport::Stream* stream) {
+    auto new_stream = std::make_unique<MoqtSession::Stream>(
+        session, stream, /*is_control_stream=*/true);
+    session->control_stream_ = kControlStreamId;
+    return new_stream;
+  }
+
+  static std::unique_ptr<MoqtParserVisitor> CreateUniStream(
+      MoqtSession* session, webtransport::Stream* stream) {
+    auto new_stream = std::make_unique<MoqtSession::Stream>(
+        session, stream, /*is_control_stream=*/false);
+    return new_stream;
+  }
+
+  // In the test OnSessionReady, the session creates a stream and then passes
+  // its unique_ptr to the mock webtransport stream. This function casts
+  // that unique_ptr into a MoqtSession::Stream*, which is a private class of
+  // MoqtSession, and then casts again into MoqtParserVisitor so that the test
+  // can inject packets into that stream.
+  // This function is useful for any test that wants to inject packets on a
+  // stream created by the MoqtSession.
+  static MoqtParserVisitor* FetchParserVisitorFromWebtransportStreamVisitor(
+      MoqtSession* session, webtransport::StreamVisitor* visitor) {
+    return (MoqtSession::Stream*)visitor;
+  }
+
+  static void CreateRemoteTrack(MoqtSession* session, FullTrackName& name,
+                                RemoteTrack::Visitor* visitor) {
+    session->remote_tracks_.try_emplace(name, name, visitor);
+  }
+
+  static void CreateRemoteTrackWithAlias(MoqtSession* session,
+                                         FullTrackName& name,
+                                         RemoteTrack::Visitor* visitor,
+                                         uint64_t track_alias) {
+    auto it = session->remote_tracks_.try_emplace(name, name, visitor);
+    RemoteTrack& track = it.first->second;
+    track.set_track_alias(track_alias);
+    session->tracks_by_alias_.emplace(std::make_pair(track_alias, &track));
+  }
+};
+
+class MoqtSessionTest : public quic::test::QuicTest {
+ public:
+  MoqtSessionTest()
+      : session_(&mock_session_, default_parameters,
+                 session_callbacks_.AsSessionCallbacks()) {}
+
+  MockSessionCallbacks session_callbacks_;
+  StrictMock<webtransport::test::MockSession> mock_session_;
+  MoqtSession session_;
+};
+
+TEST_F(MoqtSessionTest, Queries) {
+  EXPECT_EQ(session_.perspective(), quic::Perspective::IS_CLIENT);
+}
+
+// Verify the session sends CLIENT_SETUP on the control stream.
+TEST_F(MoqtSessionTest, OnSessionReady) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  EXPECT_CALL(mock_session_, OpenOutgoingBidirectionalStream())
+      .WillOnce(Return(&mock_stream));
+  std::unique_ptr<webtransport::StreamVisitor> visitor;
+  // Save a reference to MoqtSession::Stream
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
+        visitor = std::move(new_visitor);
+      });
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillOnce(Return(webtransport::StreamId(4)));
+  bool correct_message = false;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kClientSetup);
+        return absl::OkStatus();
+      });
+  session_.OnSessionReady();
+  EXPECT_TRUE(correct_message);
+
+  // Receive SERVER_SETUP
+  MoqtParserVisitor* stream_input =
+      MoqtSessionPeer::FetchParserVisitorFromWebtransportStreamVisitor(
+          &session_, visitor.get());
+  // Handle the server setup
+  MoqtServerSetup setup = {
+      MoqtVersion::kDraft01,
+      MoqtRole::kBoth,
+  };
+  EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
+  stream_input->OnServerSetupMessage(setup);
+}
+
+TEST_F(MoqtSessionTest, OnClientSetup) {
+  MoqtSessionParameters server_parameters = {
+      /*version=*/MoqtVersion::kDraft01,
+      /*perspective=*/quic::Perspective::IS_SERVER,
+      /*using_webtrans=*/true,
+      /*path=*/"",
+      /*deliver_partial_objects=*/false,
+  };
+  MoqtSession server_session(&mock_session_, server_parameters,
+                             session_callbacks_.AsSessionCallbacks());
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
+  MoqtClientSetup setup = {
+      /*supported_versions*/ {MoqtVersion::kDraft01},
+      /*role=*/MoqtRole::kBoth,
+      /*path=*/std::nullopt,
+  };
+  bool correct_message = false;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kServerSetup);
+        return absl::OkStatus();
+      });
+  EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
+  stream_input->OnClientSetupMessage(setup);
+}
+
+TEST_F(MoqtSessionTest, OnSessionClosed) {
+  bool reported_error = false;
+  EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
+      .WillOnce([&](absl::string_view error_message) {
+        reported_error = true;
+        EXPECT_EQ(error_message, "foo");
+      });
+  session_.OnSessionClosed(webtransport::SessionErrorCode(1), "foo");
+  EXPECT_TRUE(reported_error);
+}
+
+TEST_F(MoqtSessionTest, OnIncomingBidirectionalStream) {
+  ::testing::InSequence seq;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  StrictMock<webtransport::test::MockStreamVisitor> mock_stream_visitor;
+  EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
+      .WillOnce(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream, visitor()).WillOnce(Return(&mock_stream_visitor));
+  EXPECT_CALL(mock_stream_visitor, OnCanRead()).Times(1);
+  EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
+      .WillOnce(Return(nullptr));
+  session_.OnIncomingBidirectionalStreamAvailable();
+}
+
+TEST_F(MoqtSessionTest, OnIncomingUnidirectionalStream) {
+  ::testing::InSequence seq;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  StrictMock<webtransport::test::MockStreamVisitor> mock_stream_visitor;
+  EXPECT_CALL(mock_session_, AcceptIncomingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream, visitor()).WillOnce(Return(&mock_stream_visitor));
+  EXPECT_CALL(mock_stream_visitor, OnCanRead()).Times(1);
+  EXPECT_CALL(mock_session_, AcceptIncomingUnidirectionalStream())
+      .WillOnce(Return(nullptr));
+  session_.OnIncomingUnidirectionalStreamAvailable();
+}
+
+TEST_F(MoqtSessionTest, Error) {
+  bool reported_error = false;
+  EXPECT_CALL(mock_session_, CloseSession(1, "foo")).Times(1);
+  EXPECT_CALL(session_callbacks_.session_terminated_callback, Call(_))
+      .WillOnce([&](absl::string_view error_message) {
+        reported_error = (error_message == "foo");
+      });
+  session_.Error("foo");
+  EXPECT_TRUE(reported_error);
+}
+
+TEST_F(MoqtSessionTest, AddLocalTrack) {
+  MoqtSubscribeRequest request = {
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+      /*authorization_info=*/std::nullopt,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  // Request for track returns SUBSCRIBE_ERROR.
+  bool correct_message = false;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]),
+                  MoqtMessageType::kSubscribeError);
+        return absl::OkStatus();
+      });
+  stream_input->OnSubscribeRequestMessage(request);
+  EXPECT_TRUE(correct_message);
+
+  // Add the track. Now Subscribe should succeed.
+  MockLocalTrackVisitor local_track_visitor;
+  session_.AddLocalTrack(FullTrackName("foo", "bar"), &local_track_visitor);
+  correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
+        return absl::OkStatus();
+      });
+  stream_input->OnSubscribeRequestMessage(request);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, AnnounceWithOk) {
+  testing::MockFunction<void(absl::string_view track_namespace,
+                             std::optional<absl::string_view> error_message)>
+      announce_resolved_callback;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+  bool correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kAnnounce);
+        return absl::OkStatus();
+      });
+  session_.Announce("foo", announce_resolved_callback.AsStdFunction());
+  EXPECT_TRUE(correct_message);
+
+  MoqtAnnounceOk ok = {
+      /*track_namespace=*/"foo",
+  };
+  correct_message = false;
+  EXPECT_CALL(announce_resolved_callback, Call(_, _))
+      .WillOnce([&](absl::string_view track_namespace,
+                    std::optional<absl::string_view> error_message) {
+        correct_message = true;
+        EXPECT_EQ(track_namespace, "foo");
+        EXPECT_FALSE(error_message.has_value());
+      });
+  stream_input->OnAnnounceOkMessage(ok);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, AnnounceWithError) {
+  testing::MockFunction<void(absl::string_view track_namespace,
+                             std::optional<absl::string_view> error_message)>
+      announce_resolved_callback;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+  bool correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kAnnounce);
+        return absl::OkStatus();
+      });
+  session_.Announce("foo", announce_resolved_callback.AsStdFunction());
+  EXPECT_TRUE(correct_message);
+
+  MoqtAnnounceError error = {
+      /*track_namespace=*/"foo",
+  };
+  correct_message = false;
+  EXPECT_CALL(announce_resolved_callback, Call(_, _))
+      .WillOnce([&](absl::string_view track_namespace,
+                    std::optional<absl::string_view> error_message) {
+        correct_message = true;
+        EXPECT_EQ(track_namespace, "foo");
+        EXPECT_TRUE(error_message.has_value());
+      });
+  stream_input->OnAnnounceErrorMessage(error);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, HasSubscribers) {
+  MockLocalTrackVisitor local_track_visitor;
+  FullTrackName ftn("foo", "bar");
+  EXPECT_FALSE(session_.HasSubscribers(ftn));
+  session_.AddLocalTrack(ftn, &local_track_visitor);
+  EXPECT_FALSE(session_.HasSubscribers(ftn));
+
+  // Peer subscribes.
+  MoqtSubscribeRequest request = {
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+      /*authorization_info=*/std::nullopt,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  bool correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
+        return absl::OkStatus();
+      });
+  stream_input->OnSubscribeRequestMessage(request);
+  EXPECT_TRUE(correct_message);
+  EXPECT_TRUE(session_.HasSubscribers(ftn));
+}
+
+TEST_F(MoqtSessionTest, SubscribeWithOk) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  MockRemoteTrackVisitor remote_track_visitor;
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+  bool correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]),
+                  MoqtMessageType::kSubscribeRequest);
+        return absl::OkStatus();
+      });
+  session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
+
+  MoqtSubscribeOk ok = {
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*track_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+  };
+  correct_message = false;
+  EXPECT_CALL(remote_track_visitor, OnReply(_, _))
+      .WillOnce([&](const FullTrackName& ftn,
+                    std::optional<absl::string_view> error_message) {
+        correct_message = true;
+        EXPECT_EQ(ftn, FullTrackName("foo", "bar"));
+        EXPECT_FALSE(error_message.has_value());
+      });
+  stream_input->OnSubscribeOkMessage(ok);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, SubscribeWithError) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  MockRemoteTrackVisitor remote_track_visitor;
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+  bool correct_message = true;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]),
+                  MoqtMessageType::kSubscribeRequest);
+        return absl::OkStatus();
+      });
+  session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
+
+  MoqtSubscribeError error = {
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*error_code=*/1,
+      /*reason_phrase=*/"deadbeef",
+  };
+  correct_message = false;
+  EXPECT_CALL(remote_track_visitor, OnReply(_, _))
+      .WillOnce([&](const FullTrackName& ftn,
+                    std::optional<absl::string_view> error_message) {
+        correct_message = true;
+        EXPECT_EQ(ftn, FullTrackName("foo", "bar"));
+        EXPECT_EQ(*error_message, "deadbeef");
+      });
+  stream_input->OnSubscribeErrorMessage(error);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, ReplyToAnnounce) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  MoqtAnnounce announce = {
+      /*track_namespace=*/"foo",
+  };
+  bool correct_message = false;
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kAnnounceOk);
+        return absl::OkStatus();
+      });
+  stream_input->OnAnnounceMessage(announce);
+  EXPECT_TRUE(correct_message);
+}
+
+TEST_F(MoqtSessionTest, IncomingObject) {
+  MockRemoteTrackVisitor visitor_;
+  FullTrackName ftn("foo", "bar");
+  std::string payload = "deadbeef";
+  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtObject object = {
+      /*track_id=*/0,
+      /*group_sequence=*/0,
+      /*object_sequence=*/0,
+      /*object_send_order=*/0,
+      /*payload_length=*/8,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> object_stream =
+      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+
+  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kIncomingUniStreamId));
+  object_stream->OnObjectMessage(object, payload, true);
+}
+
+TEST_F(MoqtSessionTest, IncomingPartialObject) {
+  MockRemoteTrackVisitor visitor_;
+  FullTrackName ftn("foo", "bar");
+  std::string payload = "deadbeef";
+  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtObject object = {
+      /*track_id=*/0,
+      /*group_sequence=*/0,
+      /*object_sequence=*/0,
+      /*object_send_order=*/0,
+      /*payload_length=*/16,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> object_stream =
+      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+
+  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kIncomingUniStreamId));
+  object_stream->OnObjectMessage(object, payload, false);
+  object_stream->OnObjectMessage(object, payload, true);  // complete the object
+}
+
+TEST_F(MoqtSessionTest, IncomingPartialObjectNoBuffer) {
+  MoqtSessionParameters parameters = {
+      /*version=*/MoqtVersion::kDraft01,
+      /*perspective=*/quic::Perspective::IS_CLIENT,
+      /*using_webtrans=*/true,
+      /*path=*/"",
+      /*deliver_partial_objects=*/true,
+  };
+  MoqtSession session(&mock_session_, parameters,
+                      session_callbacks_.AsSessionCallbacks());
+  MockRemoteTrackVisitor visitor_;
+  FullTrackName ftn("foo", "bar");
+  std::string payload = "deadbeef";
+  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session, ftn, &visitor_, 0);
+  MoqtObject object = {
+      /*track_id=*/0,
+      /*group_sequence=*/0,
+      /*object_sequence=*/0,
+      /*object_send_order=*/0,
+      /*payload_length=*/16,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> object_stream =
+      MoqtSessionPeer::CreateUniStream(&session, &mock_stream);
+
+  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(2);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kIncomingUniStreamId));
+  object_stream->OnObjectMessage(object, payload, false);
+  object_stream->OnObjectMessage(object, payload, true);  // complete the object
+}
+
+TEST_F(MoqtSessionTest, IncomingObjectUnknownTrackId) {
+  MockRemoteTrackVisitor visitor_;
+  FullTrackName ftn("foo", "bar");
+  std::string payload = "deadbeef";
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_);
+  MoqtObject object = {
+      /*track_id=*/0,
+      /*group_sequence=*/0,
+      /*object_sequence=*/0,
+      /*object_send_order=*/0,
+      /*payload_length=*/8,
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> object_stream =
+      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
+
+  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(0);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kIncomingUniStreamId));
+  object_stream->OnObjectMessage(object, payload, true);
+  // Packet should be buffered.
+
+  // SUBSCRIBE_OK arrives
+  MoqtSubscribeOk ok = {
+      /*track_namespace=*/ftn.track_namespace,
+      /*track_name=*/ftn.track_name,
+      /*track_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+  };
+  StrictMock<webtransport::test::MockStream> mock_control_stream;
+  std::unique_ptr<MoqtParserVisitor> control_stream =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
+  EXPECT_CALL(visitor_, OnReply(_, _)).Times(1);
+  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
+  control_stream->OnSubscribeOkMessage(ok);
+}
+
+// TODO: Cover the error cases in the above
+
+}  // namespace test
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
new file mode 100644
index 0000000..4ed173c
--- /dev/null
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -0,0 +1,69 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
+#define QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
+
+#include <cstdint>
+#include <list>
+#include <optional>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+
+namespace moqt {
+
+struct SubscribeWindow {
+  FullSequence start;
+  std::optional<FullSequence> end;
+  // Creates a half-open window.
+  SubscribeWindow(uint64_t start_group, uint64_t start_object) {
+    start = {start_group, start_object};
+    end = std::nullopt;
+  }
+  // Creates a closed window.
+  SubscribeWindow(uint64_t start_group, uint64_t start_object,
+                  uint64_t end_group, uint64_t end_object) {
+    start = {start_group, start_object};
+    end = {end_group, end_object};
+  }
+  bool InWindow(const FullSequence& seq) const {
+    if (seq < start) {
+      return false;
+    }
+    if (!end.has_value() || seq < end.value()) {
+      return true;
+    }
+    return false;
+  }
+};
+
+// Class to keep track of the sequence number blocks to which a peer is
+// subscribed.
+class MoqtSubscribeWindows {
+ public:
+  MoqtSubscribeWindows() {}
+
+  bool SequenceIsSubscribed(uint64_t group, uint64_t object) const {
+    FullSequence seq(group, object);
+    for (auto it : windows) {
+      if (it.InWindow(seq)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  // |window| has already been converted into absolute sequence numbers. An
+  // optimization could consolidate overlapping subscribe windows.
+  void AddWindow(SubscribeWindow window) { windows.push_front(window); }
+
+  bool IsEmpty() const { return windows.empty(); }
+
+ private:
+  std::list<SubscribeWindow> windows;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_SUBSCRIBE_WINDOWS_H
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
new file mode 100644
index 0000000..b1b1bd0
--- /dev/null
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -0,0 +1,40 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+
+#include "quiche/quic/platform/api/quic_test.h"
+
+namespace moqt {
+
+namespace test {
+
+class MoqtSubscribeWindowsTest : public quic::test::QuicTest {
+ public:
+  MoqtSubscribeWindows windows_;
+};
+
+TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
+  EXPECT_TRUE(windows_.IsEmpty());
+  windows_.AddWindow(SubscribeWindow(1, 3));
+  EXPECT_FALSE(windows_.IsEmpty());
+}
+
+TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
+  EXPECT_TRUE(windows_.IsEmpty());
+  // The first two windows overlap; the third is open-ended.
+  windows_.AddWindow(SubscribeWindow(1, 0, 3, 9));
+  windows_.AddWindow(SubscribeWindow(2, 4, 4, 3));
+  windows_.AddWindow(SubscribeWindow(10, 0));
+  EXPECT_FALSE(windows_.IsEmpty());
+  EXPECT_FALSE(windows_.SequenceIsSubscribed(0, 8));
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(1, 0));
+  EXPECT_FALSE(windows_.SequenceIsSubscribed(4, 4));
+  EXPECT_FALSE(windows_.SequenceIsSubscribed(8, 3));
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(100, 7));
+}
+
+}  // namespace test
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
new file mode 100644
index 0000000..48ba7bc
--- /dev/null
+++ b/quiche/quic/moqt/moqt_track.h
@@ -0,0 +1,116 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
+#define QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
+
+#include <cstdint>
+#include <optional>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+
+namespace moqt {
+
+// A track to which the peer might subscribe.
+class LocalTrack {
+ public:
+  class Visitor {
+   public:
+    virtual ~Visitor() = default;
+
+    // Requests that application re-publish objects from {start_group,
+    // start_object} to the latest object. If the return value is nullopt, the
+    // subscribe is valid and the application will deliver the object and
+    // the session will send SUBSCRIBE_OK. If the return has a value, the value
+    // is the error message (the session will send SUBSCRIBE_ERROR). Via this
+    // API, the application decides if a partially fulfillable
+    // SUBSCRIBE_REQUEST results in an error or not.
+    virtual std::optional<absl::string_view> OnSubscribeRequestForPast(
+        const SubscribeWindow& window) = 0;
+  };
+  // |visitor| must not be nullptr.
+  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
+             Visitor* visitor)
+      : full_track_name_(full_track_name),
+        track_alias_(track_alias),
+        visitor_(visitor) {}
+  // Creates a LocalTrack that does not start at sequence (0,0)
+  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
+             Visitor* visitor, FullSequence next_sequence)
+      : full_track_name_(full_track_name),
+        track_alias_(track_alias),
+        next_sequence_(next_sequence),
+        visitor_(visitor) {}
+
+  const FullTrackName& full_track_name() const { return full_track_name_; }
+
+  uint64_t track_alias() const { return track_alias_; }
+
+  Visitor* visitor() { return visitor_; }
+
+  bool ShouldSend(uint64_t group, uint64_t object) const {
+    return windows_.SequenceIsSubscribed(group, object);
+  }
+
+  void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
+
+  // Returns the largest observed sequence, but increments the object sequence
+  // by one.
+  const FullSequence& next_sequence() const { return next_sequence_; }
+
+  bool HasSubscriber() const { return !windows_.IsEmpty(); }
+
+ private:
+  // This only needs to track subscriptions to current and future objects;
+  // requests for objects in the past are forwarded to the application.
+  const FullTrackName full_track_name_;
+  const uint64_t track_alias_;
+  // The sequence numbers from this track to which the peer is subscribed.
+  MoqtSubscribeWindows windows_;
+  FullSequence next_sequence_ = {0, 0};
+  Visitor* visitor_;
+};
+
+// A track on the peer to which the session has subscribed.
+class RemoteTrack {
+ public:
+  class Visitor {
+   public:
+    virtual ~Visitor() = default;
+    // Called when the session receives a response to the SUBSCRIBE_REQUEST.
+    virtual void OnReply(
+        const FullTrackName& full_track_name,
+        std::optional<absl::string_view> error_reason_phrase) = 0;
+    virtual void OnObjectFragment(const FullTrackName& full_track_name,
+                                  uint32_t stream_id, uint64_t group_sequence,
+                                  uint64_t object_sequence,
+                                  uint64_t object_send_order,
+                                  absl::string_view object,
+                                  bool end_of_message) = 0;
+    // TODO(martinduke): Add final sequence numbers
+  };
+  RemoteTrack(const FullTrackName& full_track_name, Visitor* visitor)
+      : full_track_name_(full_track_name), visitor_(visitor) {}
+
+  const FullTrackName& full_track_name() { return full_track_name_; }
+
+  std::optional<uint64_t> track_alias() const { return track_alias_; }
+
+  Visitor* visitor() { return visitor_; }
+
+  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
+
+ private:
+  // TODO: There is no accounting for the number of outstanding subscribes,
+  // because we can't match track names to individual subscribes.
+  FullTrackName full_track_name_;
+  std::optional<uint64_t> track_alias_;
+  Visitor* visitor_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
new file mode 100644
index 0000000..f9fb0b4
--- /dev/null
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -0,0 +1,66 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_track.h"
+
+#include <cstdint>
+#include <optional>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
+#include "quiche/quic/platform/api/quic_test.h"
+
+namespace moqt {
+
+namespace test {
+
+class LocalTrackTest : public quic::test::QuicTest {
+ public:
+  LocalTrackTest()
+      : track_(FullTrackName("foo", "bar"), /*track_alias=*/5, &visitor_,
+               FullSequence(4, 1)) {}
+  LocalTrack track_;
+  MockLocalTrackVisitor visitor_;
+};
+
+TEST_F(LocalTrackTest, Queries) {
+  EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
+  EXPECT_EQ(track_.track_alias(), 5);
+  EXPECT_EQ(track_.visitor(), &visitor_);
+  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));
+  EXPECT_FALSE(track_.HasSubscriber());
+}
+
+TEST_F(LocalTrackTest, AfterSubscribe) {
+  track_.AddWindow(SubscribeWindow(4, 1));
+  EXPECT_TRUE(track_.HasSubscriber());
+  EXPECT_FALSE(track_.ShouldSend(3, 12));
+  EXPECT_FALSE(track_.ShouldSend(4, 0));
+  EXPECT_TRUE(track_.ShouldSend(4, 1));
+  EXPECT_TRUE(track_.ShouldSend(12, 0));
+}
+
+class RemoteTrackTest : public quic::test::QuicTest {
+ public:
+  RemoteTrackTest() : track_(FullTrackName("foo", "bar"), &visitor_) {}
+  RemoteTrack track_;
+  MockRemoteTrackVisitor visitor_;
+};
+
+TEST_F(RemoteTrackTest, Queries) {
+  EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
+  EXPECT_EQ(track_.track_alias(), std::nullopt);
+  EXPECT_EQ(track_.visitor(), &visitor_);
+}
+
+TEST_F(RemoteTrackTest, SetAlias) {
+  track_.set_track_alias(5);
+  EXPECT_EQ(track_.track_alias(), 5);
+}
+
+}  // namespace test
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
new file mode 100644
index 0000000..87b9af9
--- /dev/null
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -0,0 +1,366 @@
+// Copyright (c) 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <cstdint>
+#include <iostream>
+#include <memory>
+#include <optional>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/strings/str_cat.h"
+#include "absl/strings/string_view.h"
+#include "quiche/quic/core/crypto/proof_verifier.h"
+#include "quiche/quic/core/io/quic_default_event_loop.h"
+#include "quiche/quic/core/io/quic_event_loop.h"
+#include "quiche/quic/core/quic_default_clock.h"
+#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/tools/moqt_client.h"
+#include "quiche/quic/platform/api/quic_default_proof_providers.h"
+#include "quiche/quic/platform/api/quic_socket_address.h"
+#include "quiche/quic/platform/api/quic_thread.h"
+#include "quiche/quic/tools/fake_proof_verifier.h"
+#include "quiche/quic/tools/quic_name_lookup.h"
+#include "quiche/quic/tools/quic_url.h"
+#include "quiche/common/platform/api/quiche_command_line_flags.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_circular_deque.h"
+
+DEFINE_QUICHE_COMMAND_LINE_FLAG(
+    bool, disable_certificate_verification, false,
+    "If true, don't verify the server certificate.");
+
+class ChatClient {
+ public:
+  ChatClient(quic::QuicServerId& server_id, std::string path,
+             std::string username, std::string chat_id)
+      : chat_id_(chat_id),
+        username_(username),
+        my_track_name_(UsernameToTrackName(username)),
+        catalog_name_("moq-chat/" + chat_id, "/catalog") {
+    quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get();
+    std::cout << "Connecting to host " << server_id.host() << " port "
+              << server_id.port() << " path " << path << "\n";
+    event_loop_ = quic::GetDefaultEventLoop()->Create(clock);
+    quic::QuicSocketAddress peer_address =
+        quic::tools::LookupAddress(AF_UNSPEC, server_id);
+    std::unique_ptr<quic::ProofVerifier> verifier;
+    const bool ignore_certificate = quiche::GetQuicheCommandLineFlag(
+        FLAGS_disable_certificate_verification);
+    if (ignore_certificate) {
+      verifier = std::make_unique<quic::FakeProofVerifier>();
+    } else {
+      verifier = quic::CreateDefaultProofVerifier(server_id.host());
+    }
+    client_ = std::make_unique<moqt::MoqtClient>(
+        peer_address, server_id, std::move(verifier), event_loop_.get());
+    session_callbacks_.session_established_callback = [this]() {
+      std::cout << "Session established\n";
+      session_is_open_ = true;
+    };
+    session_callbacks_.session_terminated_callback =
+        [this](absl::string_view error_message) {
+          std::cerr << "Closed session, reason = " << error_message << "\n";
+          session_is_open_ = false;
+        };
+    session_callbacks_.session_deleted_callback = [this]() {
+      session_ = nullptr;
+    };
+    client_->Connect(path, std::move(session_callbacks_));
+  }
+
+  bool session_is_open() const { return session_is_open_; }
+  bool is_syncing() const {
+    return catalog_group_.has_value() || subscribes_to_make_ > 0 ||
+           !session_->HasSubscribers(my_track_name_);
+  }
+
+  void RunEventLoop() {
+    event_loop_->RunEventLoopOnce(quic::QuicTime::Delta::FromSeconds(5));
+  }
+
+  class QUICHE_EXPORT RemoteTrackVisitor : public moqt::RemoteTrack::Visitor {
+   public:
+    RemoteTrackVisitor(ChatClient* client) : client_(client) {}
+
+    void OnReply(const moqt::FullTrackName& full_track_name,
+                 std::optional<absl::string_view> reason_phrase) override {
+      client_->subscribes_to_make_--;
+      if (full_track_name == client_->catalog_name_) {
+        std::cout << "Subscription to catalog ";
+      } else {
+        std::cout << "Subscription to user " << full_track_name.track_namespace
+                  << " ";
+      }
+      if (reason_phrase.has_value()) {
+        std::cout << "REJECTED, reason = " << reason_phrase.value() << "\n";
+      } else {
+        std::cout << "ACCEPTED\n";
+      }
+    }
+
+    void OnObjectFragment(const moqt::FullTrackName& full_track_name,
+                          uint32_t /*stream_id*/, uint64_t group_sequence,
+                          uint64_t object_sequence,
+                          uint64_t /*object_send_order*/,
+                          absl::string_view object,
+                          bool end_of_message) override {
+      if (!end_of_message) {
+        std::cerr << "Error: received partial message despite requesting "
+                     "buffering\n";
+      }
+      if (full_track_name == client_->catalog_name_) {
+        if (group_sequence < client_->catalog_group_) {
+          std::cout << "Ignoring old catalog";
+          return;
+        }
+        client_->ProcessCatalog(object, this, group_sequence, object_sequence);
+        return;
+      }
+      // TODO: Message is from another chat participant
+    }
+
+   private:
+    ChatClient* client_;
+  };
+
+  // returns false on error
+  bool AnnounceAndSubscribe() {
+    session_ = client_->session();
+    if (session_ == nullptr) {
+      std::cout << "Failed to connect.\n";
+      return false;
+    }
+    // By not sending a visitor, the application will not fulfill subscriptions
+    // to previous objects.
+    session_->AddLocalTrack(my_track_name_, nullptr);
+    moqt::MoqtAnnounceCallback announce_callback =
+        [&](absl::string_view track_namespace,
+            std::optional<absl::string_view> message) {
+          if (message.has_value()) {
+            std::cout << "ANNOUNCE rejected, " << message.value() << "\n";
+            session_->Error("Local ANNOUNCE rejected");
+            return;
+          }
+          std::cout << "ANNOUNCE for " << track_namespace << " accepted\n";
+          return;
+        };
+    std::cout << "Announcing " << my_track_name_.track_namespace << "\n";
+    session_->Announce(my_track_name_.track_namespace,
+                       std::move(announce_callback));
+    remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this);
+    if (!session_->SubscribeCurrentGroup(
+            catalog_name_.track_namespace, catalog_name_.track_name,
+            remote_track_visitor_.get(), username_)) {
+      std::cout << "Failed to get catalog for " << chat_id_ << "\n";
+      return false;
+    }
+    return true;
+  }
+
+  class InputHandler : quic::QuicThread {
+   public:
+    explicit InputHandler(ChatClient* client)
+        : quic::QuicThread("InputThread"), client_(client) {}
+
+    void Run() final {
+      while (client_->session_is_open_) {
+        std::string message_to_send;
+        std::cin >> message_to_send;  // Waiting to start input
+        client_->entering_data_ = true;
+        std::cout << "> ";
+        std::cin >> message_to_send;
+        client_->entering_data_ = false;
+        while (!client_->incoming_messages_.empty()) {
+          std::cout << client_->incoming_messages_.front() << "\n";
+          client_->incoming_messages_.pop_front();
+        }
+        if (message_to_send.empty()) {
+          continue;
+        }
+        if (message_to_send == ":exit") {
+          std::cout << "Exiting the app.\n";
+          // TODO: Close the session.
+          client_->session_is_open_ = false;
+          break;
+        }
+        // TODO: Send the message
+        std::cout << client_->username_ << ": " << message_to_send << "\n";
+      }
+    }
+
+   private:
+    ChatClient* client_;
+  };
+
+ private:
+  moqt::FullTrackName UsernameToTrackName(absl::string_view username) {
+    return moqt::FullTrackName(
+        absl::StrCat("moq-chat/", chat_id_, "/participant/", username), "");
+  }
+
+  // Objects from the same catalog group arrive on the same stream, and in
+  // object sequence order.
+  void ProcessCatalog(absl::string_view object,
+                      moqt::RemoteTrack::Visitor* visitor,
+                      uint64_t group_sequence, uint64_t object_sequence) {
+    std::string message(object);
+    std::istringstream f(message);
+    std::string line;
+    bool got_version = true;
+    if (object_sequence == 0) {
+      std::cout << "Received new Catalog. Users:\n";
+      got_version = false;
+    }
+    while (std::getline(f, line)) {
+      if (!got_version) {
+        // Chat server currently does not send version
+        if (line != "version=1") {
+          session_->Error("Catalog does not begin with version");
+          return;
+        }
+        got_version = true;
+        continue;
+      }
+      if (line.empty()) {
+        continue;
+      }
+      std::string user;
+      bool add = true;
+      if (object_sequence > 0) {
+        switch (line[0]) {
+          case '-':
+            add = false;
+            break;
+          case '+':
+            break;
+          default:
+            std::cerr << "Catalog update with neither + nor -\n";
+            return;
+        }
+        user = line.substr(1, line.size() - 1);
+      } else {
+        user = line;
+      }
+      if (username_ == user) {
+        std::cout << user << "\n";
+        continue;
+      }
+      if (!add) {
+        // TODO: Unsubscribe from the user that's leaving
+        std::cout << user << "left the chat\n";
+        other_users_.erase(user);
+        continue;
+      }
+      if (object_sequence == 0) {
+        std::cout << user << "\n";
+      } else {
+        std::cout << user << "joined the chat\n";
+      }
+      auto it = other_users_.find(user);
+      if (it == other_users_.end()) {
+        moqt::FullTrackName to_subscribe = UsernameToTrackName(user);
+        auto new_user = other_users_.emplace(
+            std::make_pair(user, ChatUser(to_subscribe, group_sequence)));
+        ChatUser& user_record = new_user.first->second;
+        session_->SubscribeRelative(user_record.full_track_name.track_namespace,
+                                    user_record.full_track_name.track_name, 0,
+                                    0, visitor);
+        subscribes_to_make_++;
+      } else {
+        if (it->second.from_group == group_sequence) {
+          session_->Error("User listed twice in Catalog");
+          return;
+        }
+        it->second.from_group = group_sequence;
+      }
+    }
+    if (object_sequence == 0) {  // Eliminate users that are no longer present
+      for (const auto& it : other_users_) {
+        if (it.second.from_group != group_sequence) {
+          other_users_.erase(it.first);
+        }
+      }
+    }
+    catalog_group_ = group_sequence;
+  }
+
+  struct ChatUser {
+    moqt::FullTrackName full_track_name;
+    uint64_t from_group;
+    ChatUser(moqt::FullTrackName& ftn, uint64_t group)
+        : full_track_name(ftn), from_group(group) {}
+  };
+
+  // Basic session information
+  std::string chat_id_;
+  std::string username_;
+  moqt::FullTrackName my_track_name_;
+
+  // General state variables
+  std::unique_ptr<quic::QuicEventLoop> event_loop_;
+  bool session_is_open_ = false;
+  moqt::MoqtSession* session_ = nullptr;
+  std::unique_ptr<moqt::MoqtClient> client_;
+  moqt::MoqtSessionCallbacks session_callbacks_;
+
+  // Related to syncing.
+  std::optional<uint64_t> catalog_group_;
+  moqt::FullTrackName catalog_name_;
+  absl::flat_hash_map<std::string, ChatUser> other_users_;
+  int subscribes_to_make_ = 1;
+
+  // Related to subscriptions/announces
+  // TODO: One for each subscribe
+  std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
+
+  // Handling incoming and outgoing messages
+  quiche::QuicheCircularDeque<std::string> incoming_messages_;
+  bool entering_data_ = false;
+};
+
+// A client for MoQT over chat, used for interop testing. See
+// https://afrind.github.io/draft-frindell-moq-chat/draft-frindell-moq-chat.html
+int main(int argc, char* argv[]) {
+  const char* usage = "Usage: chat_client [options] <url> <username> <chat-id>";
+  std::vector<std::string> args =
+      quiche::QuicheParseCommandLineFlags(usage, argc, argv);
+  if (args.size() != 3) {
+    quiche::QuichePrintCommandLineFlagHelp(usage);
+    return 1;
+  }
+  quic::QuicUrl url(args[0], "https");
+  quic::QuicServerId server_id(url.host(), url.port());
+  std::string path = url.PathParamsQuery();
+  std::string username = args[1];
+  std::string chat_id = args[2];
+  ChatClient client(server_id, path, username, chat_id);
+
+  while (!client.session_is_open()) {
+    client.RunEventLoop();
+  }
+
+  if (!client.AnnounceAndSubscribe()) {
+    return 1;
+  }
+  while (client.is_syncing()) {
+    client.RunEventLoop();
+  }
+  if (client.session_is_open()) {
+    std::cout << "Fully connected. Press ENTER to begin input of message, "
+              << "ENTER when done.\n";
+  }
+  ChatClient::InputHandler input_thread(&client);
+  while (client.session_is_open()) {
+    client.RunEventLoop();
+  }
+  return 0;
+}
diff --git a/quiche/quic/moqt/tools/moqt_client.cc b/quiche/quic/moqt/tools/moqt_client.cc
index 5cbc578..07c088e 100644
--- a/quiche/quic/moqt/tools/moqt_client.cc
+++ b/quiche/quic/moqt/tools/moqt_client.cc
@@ -93,6 +93,7 @@
   parameters.perspective = quic::Perspective::IS_CLIENT,
   parameters.using_webtrans = true;
   parameters.path = "";
+  parameters.deliver_partial_objects = false;
 
   // Ensure that we never have a dangling pointer to the session.
   MoqtSessionDeletedCallback deleted_callback =
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
new file mode 100644
index 0000000..9248114
--- /dev/null
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -0,0 +1,54 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_
+#define QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_
+
+#include <cstdint>
+#include <optional>
+
+#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/platform/api/quic_test.h"
+
+namespace moqt::test {
+
+struct MockSessionCallbacks {
+  testing::MockFunction<void()> session_established_callback;
+  testing::MockFunction<void(absl::string_view)> session_terminated_callback;
+  testing::MockFunction<void()> session_deleted_callback;
+
+  MoqtSessionCallbacks AsSessionCallbacks() {
+    return MoqtSessionCallbacks{session_established_callback.AsStdFunction(),
+                                session_terminated_callback.AsStdFunction(),
+                                session_deleted_callback.AsStdFunction()};
+  }
+};
+
+class MockLocalTrackVisitor : public LocalTrack::Visitor {
+ public:
+  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeRequestForPast,
+              (const SubscribeWindow& window), (override));
+};
+
+class MockRemoteTrackVisitor : public RemoteTrack::Visitor {
+ public:
+  MOCK_METHOD(void, OnReply,
+              (const FullTrackName& full_track_name,
+               std::optional<absl::string_view> error_reason_phrase),
+              (override));
+  MOCK_METHOD(void, OnObjectFragment,
+              (const FullTrackName& full_track_name, uint32_t stream_id,
+               uint64_t group_sequence, uint64_t object_sequence,
+               uint64_t object_send_order, absl::string_view object,
+               bool end_of_message),
+              (override));
+};
+
+}  // namespace moqt::test
+
+#endif  // QUICHE_QUIC_MOQT_TOOLS_MOQT_MOCK_VISITOR_H_
diff --git a/quiche/quic/moqt/tools/moqt_server.cc b/quiche/quic/moqt/tools/moqt_server.cc
index c2ec5fb..e811c6d 100644
--- a/quiche/quic/moqt/tools/moqt_server.cc
+++ b/quiche/quic/moqt/tools/moqt_server.cc
@@ -34,6 +34,7 @@
     parameters.path = path;
     parameters.using_webtrans = true;
     parameters.version = MoqtVersion::kDraft01;
+    parameters.deliver_partial_objects = false;
     return std::make_unique<MoqtSession>(session, parameters,
                                          *std::move(callbacks));
   };