Refactor MoQT sending logic to pull objects from the application, instead of pushing them onto individual MoQT sessions.

This CL introduces a new interface, a MoqtTrackPublisher.  A track publisher knows everything necessary to publish a track, caches recent objects and may be shared across multiple MoQT sessions.  Objects are always pulled from the publisher.

This CL also restructures the sending logic around every object being published within a single subscription (there has been a lot of code from the era where we deduplicated objects; that is now gone).  The publisher notifies the subscription, which in turn notifies the relevant stream within the session.

There is also a lot of other minor refactors as necessary.

The current version of this CL does not support publishing objects far in the past, largely due to design ambiguities surrounding how fetch is supposed to work (I am writing a slide deck for Vancouver on that).  Instead, we merely backfill the recent objects that are still in the cache; that covers most of the use cases we care about for now.

PiperOrigin-RevId: 653833971
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 34b592f..386d907 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1508,11 +1508,14 @@
     "quic/load_balancer/load_balancer_server_id_test.cc",
 ]
 moqt_hdrs = [
+    "quic/moqt/moqt_cached_object.h",
     "quic/moqt/moqt_framer.h",
+    "quic/moqt/moqt_known_track_publisher.h",
     "quic/moqt/moqt_messages.h",
     "quic/moqt/moqt_outgoing_queue.h",
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_priority.h",
+    "quic/moqt/moqt_publisher.h",
     "quic/moqt/moqt_session.h",
     "quic/moqt/moqt_subscribe_windows.h",
     "quic/moqt/moqt_track.h",
@@ -1523,9 +1526,11 @@
     "quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
+    "quic/moqt/moqt_cached_object.cc",
     "quic/moqt/moqt_framer.cc",
     "quic/moqt/moqt_framer_test.cc",
     "quic/moqt/moqt_integration_test.cc",
+    "quic/moqt/moqt_known_track_publisher.cc",
     "quic/moqt/moqt_messages.cc",
     "quic/moqt/moqt_outgoing_queue.cc",
     "quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 23437ab..2004269 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1512,11 +1512,14 @@
     "src/quiche/quic/load_balancer/load_balancer_server_id_test.cc",
 ]
 moqt_hdrs = [
+    "src/quiche/quic/moqt/moqt_cached_object.h",
     "src/quiche/quic/moqt/moqt_framer.h",
+    "src/quiche/quic/moqt/moqt_known_track_publisher.h",
     "src/quiche/quic/moqt/moqt_messages.h",
     "src/quiche/quic/moqt/moqt_outgoing_queue.h",
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_priority.h",
+    "src/quiche/quic/moqt/moqt_publisher.h",
     "src/quiche/quic/moqt/moqt_session.h",
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
     "src/quiche/quic/moqt/moqt_track.h",
@@ -1527,9 +1530,11 @@
     "src/quiche/quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
+    "src/quiche/quic/moqt/moqt_cached_object.cc",
     "src/quiche/quic/moqt/moqt_framer.cc",
     "src/quiche/quic/moqt/moqt_framer_test.cc",
     "src/quiche/quic/moqt/moqt_integration_test.cc",
+    "src/quiche/quic/moqt/moqt_known_track_publisher.cc",
     "src/quiche/quic/moqt/moqt_messages.cc",
     "src/quiche/quic/moqt/moqt_outgoing_queue.cc",
     "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 1b9a333..7e6cd21 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1511,11 +1511,14 @@
     "quiche/quic/load_balancer/load_balancer_server_id_test.cc"
   ],
   "moqt_hdrs": [
+    "quiche/quic/moqt/moqt_cached_object.h",
     "quiche/quic/moqt/moqt_framer.h",
+    "quiche/quic/moqt/moqt_known_track_publisher.h",
     "quiche/quic/moqt/moqt_messages.h",
     "quiche/quic/moqt/moqt_outgoing_queue.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_priority.h",
+    "quiche/quic/moqt/moqt_publisher.h",
     "quiche/quic/moqt/moqt_session.h",
     "quiche/quic/moqt/moqt_subscribe_windows.h",
     "quiche/quic/moqt/moqt_track.h",
@@ -1526,9 +1529,11 @@
     "quiche/quic/moqt/tools/moqt_server.h"
   ],
   "moqt_srcs": [
+    "quiche/quic/moqt/moqt_cached_object.cc",
     "quiche/quic/moqt/moqt_framer.cc",
     "quiche/quic/moqt/moqt_framer_test.cc",
     "quiche/quic/moqt/moqt_integration_test.cc",
+    "quiche/quic/moqt/moqt_known_track_publisher.cc",
     "quiche/quic/moqt/moqt_messages.cc",
     "quiche/quic/moqt/moqt_outgoing_queue.cc",
     "quiche/quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_cached_object.cc
new file mode 100644
index 0000000..395875b
--- /dev/null
+++ b/quiche/quic/moqt/moqt_cached_object.cc
@@ -0,0 +1,25 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_cached_object.h"
+
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+moqt::PublishedObject CachedObjectToPublishedObject(
+    const CachedObject& object) {
+  PublishedObject result;
+  result.sequence = object.sequence;
+  result.status = object.status;
+  if (object.payload != nullptr && !object.payload->empty()) {
+    result.payload = quiche::QuicheMemSlice(
+        object.payload->data(), object.payload->length(),
+        [retained_pointer = object.payload](const char*) {});
+  }
+  return result;
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h
new file mode 100644
index 0000000..c04b31e
--- /dev/null
+++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -0,0 +1,29 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
+#define QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
+
+#include <memory>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// CachedObject is a version of PublishedObject with a reference counted
+// payload.
+struct CachedObject {
+  FullSequence sequence;
+  MoqtObjectStatus status;
+  std::shared_ptr<quiche::QuicheMemSlice> payload;
+};
+
+// Transforms a CachedObject into a PublishedObject.
+PublishedObject CachedObjectToPublishedObject(const CachedObject& object);
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 14b7dc2..66df451 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -7,8 +7,10 @@
 #include <optional>
 #include <string>
 
+#include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_generic_session.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
@@ -23,7 +25,6 @@
 
 namespace {
 
-using ::quic::simulator::Simulator;
 using ::quic::test::MemSliceFromString;
 using ::testing::_;
 using ::testing::Assign;
@@ -176,10 +177,13 @@
         return std::optional<MoqtAnnounceErrorReason>();
       });
 
-  MoqtOutgoingQueue queue(client_->session(), FullTrackName{"test", "data"});
-  client_->session()->AddLocalTrack(FullTrackName{"test", "data"},
-                                    MoqtForwardingPreference::kGroup, &queue);
-  queue.AddObject(MemSliceFromString("object data"), /*key=*/true);
+  auto queue = std::make_shared<MoqtOutgoingQueue>(
+      client_->session(), FullTrackName{"test", "data"},
+      MoqtForwardingPreference::kGroup);
+  MoqtKnownTrackPublisher known_track_publisher;
+  known_track_publisher.Add(queue);
+  client_->session()->set_publisher(&known_track_publisher);
+  queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
   bool received_subscribe_ok = false;
   EXPECT_CALL(server_visitor, OnReply(_, _)).WillOnce([&]() {
     received_subscribe_ok = true;
@@ -210,6 +214,119 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, SendMultipleGroups) {
+  EstablishSession();
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+
+  for (MoqtForwardingPreference forwarding_preference :
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
+        MoqtForwardingPreference::kObject,
+        MoqtForwardingPreference::kDatagram}) {
+    SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
+    MockRemoteTrackVisitor client_visitor;
+    std::string name =
+        absl::StrCat("pref_", static_cast<int>(forwarding_preference));
+    auto queue = std::make_shared<MoqtOutgoingQueue>(
+        client_->session(), FullTrackName{"test", name},
+        MoqtForwardingPreference::kObject);
+    publisher.Add(queue);
+    queue->AddObject(MemSliceFromString("object 1"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 2"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 3"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
+
+    client_->session()->SubscribeCurrentGroup("test", name, &client_visitor);
+    int received = 0;
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 0, _, MoqtObjectStatus::kNormal, _,
+                                 "object 4", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 1, _, MoqtObjectStatus::kNormal, _,
+                                 "object 5", true))
+        .WillOnce([&] { ++received; });
+    bool success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 2; });
+    EXPECT_TRUE(success);
+
+    queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 2, _, MoqtObjectStatus::kNormal, _,
+                                 "object 6", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 3, _, MoqtObjectStatus::kEndOfGroup, _,
+                                 "", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 2, 0, _, MoqtObjectStatus::kNormal, _,
+                                 "object 7", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 2, 1, _, MoqtObjectStatus::kNormal, _,
+                                 "object 8", true))
+        .WillOnce([&] { ++received; });
+    success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 6; });
+    EXPECT_TRUE(success);
+  }
+}
+
+TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
+  EstablishSession();
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+
+  for (MoqtForwardingPreference forwarding_preference :
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
+        MoqtForwardingPreference::kObject,
+        MoqtForwardingPreference::kDatagram}) {
+    SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
+    MockRemoteTrackVisitor client_visitor;
+    std::string name =
+        absl::StrCat("pref_", static_cast<int>(forwarding_preference));
+    auto queue = std::make_shared<MoqtOutgoingQueue>(
+        client_->session(), FullTrackName{"test", name},
+        MoqtForwardingPreference::kObject);
+    publisher.Add(queue);
+    for (int i = 0; i < 100; ++i) {
+      queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+    }
+
+    client_->session()->SubscribeAbsolute("test", name, 0, 0, &client_visitor);
+    int received = 0;
+    // Those won't arrive since they have expired.
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true))
+        .Times(0);
+    // Those are within the "last three groups" window.
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 1, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 1, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 1, _, _, _, _, true))
+        .Times(0);  // The current group should not be closed yet.
+    bool success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 5; });
+    EXPECT_TRUE(success);
+  }
+}
+
 TEST_F(MoqtIntegrationTest, AnnounceFailure) {
   EstablishSession();
   testing::MockFunction<void(
@@ -235,10 +352,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeAbsoluteOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -254,10 +374,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeCurrentObjectOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -273,10 +396,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeCurrentGroupOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -293,7 +419,7 @@
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
   MockRemoteTrackVisitor client_visitor;
-  std::optional<absl::string_view> expected_reason = "Track does not exist";
+  std::optional<absl::string_view> expected_reason = "No tracks published";
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
       .WillOnce([&]() { received_ok = true; });
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.cc b/quiche/quic/moqt/moqt_known_track_publisher.cc
new file mode 100644
index 0000000..2fa4daf
--- /dev/null
+++ b/quiche/quic/moqt/moqt_known_track_publisher.cc
@@ -0,0 +1,34 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+
+#include <memory>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+
+namespace moqt {
+
+absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>>
+MoqtKnownTrackPublisher::GetTrack(const FullTrackName& track_name) {
+  auto it = tracks_.find(track_name);
+  if (it == tracks_.end()) {
+    return absl::NotFoundError("Requested track not found");
+  }
+  return it->second;
+}
+
+void MoqtKnownTrackPublisher::Add(
+    std::shared_ptr<MoqtTrackPublisher> track_publisher) {
+  const FullTrackName& track_name = track_publisher->GetTrackName();
+  auto [it, success] = tracks_.emplace(track_name, track_publisher);
+  QUICHE_BUG_IF(MoqtKnownTrackPublisher_duplicate, !success)
+      << "Trying to add a duplicate track into a KnownTrackPublisher";
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.h b/quiche/quic/moqt/moqt_known_track_publisher.h
new file mode 100644
index 0000000..dd9316f
--- /dev/null
+++ b/quiche/quic/moqt/moqt_known_track_publisher.h
@@ -0,0 +1,38 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
+#define QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
+
+#include <memory>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+
+namespace moqt {
+
+// MoqtKnownTrackPublisher is a publisher that supports publishing a set of
+// well-known predefined tracks.
+class MoqtKnownTrackPublisher : public MoqtPublisher {
+ public:
+  MoqtKnownTrackPublisher() = default;
+  MoqtKnownTrackPublisher(const MoqtKnownTrackPublisher&) = delete;
+  MoqtKnownTrackPublisher(MoqtKnownTrackPublisher&&) = delete;
+  MoqtKnownTrackPublisher& operator=(const MoqtKnownTrackPublisher&) = delete;
+  MoqtKnownTrackPublisher& operator=(MoqtKnownTrackPublisher&&) = delete;
+
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) override;
+  void Add(std::shared_ptr<MoqtTrackPublisher> track_publisher);
+
+ private:
+  absl::flat_hash_map<FullTrackName, std::shared_ptr<MoqtTrackPublisher>>
+      tracks_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index c2a22cd..61015be 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -80,6 +80,13 @@
   kGoawayTimeout = 0x10,
 };
 
+// Error codes used by MoQT to reset streams.
+// TODO: update with spec-defined error codes once those are available, see
+// <https://github.com/moq-wg/moq-transport/issues/481>.
+inline constexpr uint64_t kResetCodeUnknown = 0x00;
+inline constexpr uint64_t kResetCodeSubscriptionGone = 0x01;
+inline constexpr uint64_t kResetCodeTimedOut = 0x02;
+
 enum class QUICHE_EXPORT MoqtRole : uint64_t {
   kPublisher = 0x1,
   kSubscriber = 0x2,
@@ -129,6 +136,12 @@
   }
   template <typename H>
   friend H AbslHashValue(H h, const FullTrackName& m);
+
+  template <typename Sink>
+  friend void AbslStringify(Sink& sink, const FullTrackName& track_name) {
+    absl::Format(&sink, "(%s; %s)", track_name.track_namespace,
+                 track_name.track_name);
+  }
 };
 
 template <typename H>
@@ -151,6 +164,7 @@
     return (group < other.group ||
             (group == other.group && object <= other.object));
   }
+  bool operator>(const FullSequence& other) const { return !(*this <= other); }
   FullSequence& operator=(FullSequence other) {
     group = other.group;
     object = other.object;
@@ -326,6 +340,19 @@
   kStatusNotAvailable = 0x4,
 };
 
+inline bool DoesTrackStatusImplyHavingData(MoqtTrackStatusCode code) {
+  switch (code) {
+    case MoqtTrackStatusCode::kInProgress:
+    case MoqtTrackStatusCode::kFinished:
+      return true;
+    case MoqtTrackStatusCode::kDoesNotExist:
+    case MoqtTrackStatusCode::kNotYetBegun:
+    case MoqtTrackStatusCode::kStatusNotAvailable:
+      return false;
+  }
+  return false;
+}
+
 struct QUICHE_EXPORT MoqtTrackStatus {
   std::string track_namespace;
   std::string track_name;
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 9a97da0..35d1f9d 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -4,15 +4,15 @@
 
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 
-#include <cstddef>
-#include <cstdint>
+#include <memory>
 #include <optional>
 #include <utility>
+#include <vector>
 
-#include "absl/status/status.h"
 #include "absl/status/statusor.h"
-#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_mem_slice.h"
@@ -29,7 +29,7 @@
 
   if (key) {
     if (!queue_.empty()) {
-      CloseStreamForGroup(current_group_id_);
+      AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
     }
 
     if (queue_.size() == kMaxQueuedGroups) {
@@ -39,53 +39,72 @@
     ++current_group_id_;
   }
 
-  absl::string_view payload_view = payload.AsStringView();
-  uint64_t object_id = queue_.back().size();
-  queue_.back().push_back(std::move(payload));
-  PublishObject(current_group_id_, object_id, payload_view);
+  AddRawObject(MoqtObjectStatus::kNormal, std::move(payload));
 }
 
-absl::StatusOr<MoqtOutgoingQueue::PublishPastObjectsCallback>
-MoqtOutgoingQueue::OnSubscribeForPast(const SubscribeWindow& window) {
-  QUICHE_BUG_IF(
-      MoqtOutgoingQueue_requires_kGroup,
-      window.forwarding_preference() != MoqtForwardingPreference::kGroup)
-      << "MoqtOutgoingQueue currently only supports kGroup.";
-  return [this, &window]() {
-    for (size_t i = 0; i < queue_.size(); ++i) {
-      const uint64_t group_id = first_group_in_queue() + i;
-      const Group& group = queue_[i];
-      const bool is_last_group =
-          ((i == queue_.size() - 1) ||
-           !window.InWindow(FullSequence{group_id + 1, 0}));
-      for (size_t j = 0; j < group.size(); ++j) {
-        const FullSequence sequence{group_id, j};
-        if (!window.InWindow(sequence)) {
-          continue;
-        }
-        const bool is_last_object = (j == group.size() - 1);
-        PublishObject(group_id, j, group[j].AsStringView());
-        if (!is_last_group && is_last_object) {
-          // Close the group
-          CloseStreamForGroup(group_id);
-        }
+void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
+                                     quiche::QuicheMemSlice payload) {
+  FullSequence sequence{current_group_id_, queue_.back().size()};
+  queue_.back().push_back(CachedObject{
+      sequence, status,
+      std::make_shared<quiche::QuicheMemSlice>(std::move(payload))});
+  for (MoqtObjectListener* listener : listeners_) {
+    listener->OnNewObjectAvailable(sequence);
+  }
+}
+
+std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
+    FullSequence sequence) const {
+  if (sequence.group < first_group_in_queue()) {
+    return PublishedObject{FullSequence{sequence.group, sequence.object},
+                           MoqtObjectStatus::kGroupDoesNotExist,
+                           quiche::QuicheMemSlice()};
+  }
+  if (sequence.group > current_group_id_) {
+    return std::nullopt;
+  }
+  const std::vector<CachedObject>& group =
+      queue_[sequence.group - first_group_in_queue()];
+  if (sequence.object >= group.size()) {
+    if (sequence.group == current_group_id_) {
+      return std::nullopt;
+    }
+    return PublishedObject{FullSequence{sequence.group, sequence.object},
+                           MoqtObjectStatus::kObjectDoesNotExist,
+                           quiche::QuicheMemSlice()};
+  }
+  QUICHE_DCHECK(sequence == group[sequence.object].sequence);
+  return CachedObjectToPublishedObject(group[sequence.object]);
+}
+
+std::vector<FullSequence> MoqtOutgoingQueue::GetCachedObjectsInRange(
+    FullSequence start, FullSequence end) const {
+  std::vector<FullSequence> sequences;
+  SubscribeWindow window(start, end);
+  for (const Group& group : queue_) {
+    for (const CachedObject& object : group) {
+      if (window.InWindow(object.sequence)) {
+        sequences.push_back(object.sequence);
       }
     }
-  };
+  }
+  return sequences;
 }
 
-void MoqtOutgoingQueue::CloseStreamForGroup(uint64_t group_id) {
-  session_->PublishObject(track_, group_id, queue_[group_id].size(),
-                          /*object_send_order=*/group_id,
-                          MoqtObjectStatus::kEndOfGroup, "");
-  session_->CloseObjectStream(track_, group_id);
+absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const {
+  if (queue_.empty()) {
+    return MoqtTrackStatusCode::kNotYetBegun;
+  }
+  return MoqtTrackStatusCode::kInProgress;
 }
 
-void MoqtOutgoingQueue::PublishObject(uint64_t group_id, uint64_t object_id,
-                                      absl::string_view payload) {
-  session_->PublishObject(track_, group_id, object_id,
-                          /*object_send_order=*/group_id,
-                          MoqtObjectStatus::kNormal, payload);
+FullSequence MoqtOutgoingQueue::GetLargestSequence() const {
+  if (queue_.empty()) {
+    QUICHE_BUG(MoqtOutgoingQueue_GetLargestSequence_not_begun)
+        << "Calling GetLargestSequence() on a track that hasn't begun";
+    return FullSequence{0, 0};
+  }
+  return FullSequence{current_group_id_, queue_.back().size() - 1};
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index d738e43..4daa389 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -11,13 +11,13 @@
 #include <utility>
 #include <vector>
 
+#include "absl/container/flat_hash_set.h"
 #include "absl/container/inlined_vector.h"
 #include "absl/status/statusor.h"
-#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_mem_slice.h"
 
 namespace moqt {
@@ -27,15 +27,15 @@
 // groups, and maintain a buffer of three most recent groups that will be
 // provided to subscribers automatically.
 //
-// This class is primarily meant to be used by publishers to buffer the frames
-// that they produce. Limitations of this class:
-// - It currently only works with the forwarding preference of kGroup.
-// - It only supports a single session.
-// - Everything is sent in order that it is queued.
-class MoqtOutgoingQueue : public LocalTrack::Visitor {
+// This class is primarily meant to be used by original publishers to buffer the
+// frames that they produce.
+class MoqtOutgoingQueue : public MoqtTrackPublisher {
  public:
-  explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track)
-      : session_(session), track_(std::move(track)) {}
+  explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track,
+                             MoqtForwardingPreference forwarding_preference)
+      : session_(session),
+        track_(std::move(track)),
+        forwarding_preference_(forwarding_preference) {}
 
   MoqtOutgoingQueue(const MoqtOutgoingQueue&) = delete;
   MoqtOutgoingQueue(MoqtOutgoingQueue&&) = default;
@@ -46,32 +46,45 @@
   // group is closed. The first object ever sent MUST have `key` set to true.
   void AddObject(quiche::QuicheMemSlice payload, bool key);
 
-  // LocalTrack::Visitor implementation.
-  absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
-      const SubscribeWindow& window) override;
+  // MoqtTrackPublisher implementation.
+  const FullTrackName& GetTrackName() const override { return track_; }
+  std::optional<PublishedObject> GetCachedObject(
+      FullSequence sequence) const override;
+  std::vector<FullSequence> GetCachedObjectsInRange(
+      FullSequence start, FullSequence end) const override;
+  void AddObjectListener(MoqtObjectListener* listener) override {
+    listeners_.insert(listener);
+  }
+  void RemoveObjectListener(MoqtObjectListener* listener) override {
+    listeners_.erase(listener);
+  }
+  absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const override;
+  FullSequence GetLargestSequence() const override;
+  MoqtForwardingPreference GetForwardingPreference() const override {
+    return forwarding_preference_;
+  }
 
- protected:
-  // Interface to MoqtSession; can be mocked out for tests.
-  virtual void CloseStreamForGroup(uint64_t group_id);
-  virtual void PublishObject(uint64_t group_id, uint64_t object_id,
-                             absl::string_view payload);
+  bool HasSubscribers() const { return !listeners_.empty(); }
 
  private:
   // The number of recent groups to keep around for newly joined subscribers.
   static constexpr size_t kMaxQueuedGroups = 3;
 
-  using Object = quiche::QuicheMemSlice;
-  using Group = std::vector<Object>;
+  using Group = std::vector<CachedObject>;
+
+  void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
 
   // The number of the oldest group available.
-  uint64_t first_group_in_queue() {
+  uint64_t first_group_in_queue() const {
     return current_group_id_ - queue_.size() + 1;
   }
 
   MoqtSession* session_;  // Not owned.
   FullTrackName track_;
+  MoqtForwardingPreference forwarding_preference_;
   absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
   uint64_t current_group_id_ = -1;
+  absl::flat_hash_set<MoqtObjectListener*> listeners_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index ac22b64..925e369 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -5,11 +5,12 @@
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 
 #include <cstdint>
-#include <utility>
+#include <optional>
+#include <vector>
 
-#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/common/platform/api/quiche_expect_bug.h"
@@ -20,24 +21,45 @@
 namespace {
 
 using ::quic::test::MemSliceFromString;
+using ::testing::AnyOf;
 
-class TestMoqtOutgoingQueue : public MoqtOutgoingQueue {
+class TestMoqtOutgoingQueue : public MoqtOutgoingQueue,
+                              public MoqtObjectListener {
  public:
   TestMoqtOutgoingQueue()
-      : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"}) {}
-
-  void CallSubscribeForPast(const SubscribeWindow& window) {
-    absl::StatusOr<PublishPastObjectsCallback> callback =
-        OnSubscribeForPast(window);
-    QUICHE_CHECK_OK(callback.status());
-    (*std::move(callback))();
+      : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"},
+                          MoqtForwardingPreference::kGroup) {
+    AddObjectListener(this);
   }
 
-  MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), (override));
+  void OnNewObjectAvailable(FullSequence sequence) override {
+    std::optional<PublishedObject> object = GetCachedObject(sequence);
+    QUICHE_CHECK(object.has_value());
+    ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal,
+                                      MoqtObjectStatus::kEndOfGroup));
+    if (object->status == MoqtObjectStatus::kNormal) {
+      PublishObject(object->sequence.group, object->sequence.object,
+                    object->payload.AsStringView());
+    } else {
+      CloseStreamForGroup(object->sequence.group);
+    }
+  }
+
+  void CallSubscribeForPast(const SubscribeWindow& window) {
+    std::vector<FullSequence> objects =
+        GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
+    for (FullSequence object : objects) {
+      if (window.InWindow(object)) {
+        OnNewObjectAvailable(object);
+      }
+    }
+  }
+
+  MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
   MOCK_METHOD(void, PublishObject,
               (uint64_t group_id, uint64_t object_id,
                absl::string_view payload),
-              (override));
+              ());
 };
 
 TEST(MoqtOutgoingQueue, FirstObjectNotKeyframe) {
@@ -74,8 +96,7 @@
   queue.AddObject(MemSliceFromString("a"), true);
   queue.AddObject(MemSliceFromString("b"), false);
   queue.AddObject(MemSliceFromString("c"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 0));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 0));
 }
 
 TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
@@ -92,8 +113,7 @@
   queue.AddObject(MemSliceFromString("a"), true);
   queue.AddObject(MemSliceFromString("b"), false);
   queue.AddObject(MemSliceFromString("c"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 1));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 1));
 }
 
 TEST(MoqtOutgoingQueue, TwoGroups) {
@@ -141,8 +161,7 @@
   queue.AddObject(MemSliceFromString("d"), true);
   queue.AddObject(MemSliceFromString("e"), false);
   queue.AddObject(MemSliceFromString("f"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(1, 3), 0, 1));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 1));
 }
 
 TEST(MoqtOutgoingQueue, FiveGroups) {
@@ -215,8 +234,7 @@
   queue.AddObject(MemSliceFromString("h"), false);
   queue.AddObject(MemSliceFromString("i"), true);
   queue.AddObject(MemSliceFromString("j"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(4, 2), 0, 0));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 0));
 }
 
 }  // namespace
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
new file mode 100644
index 0000000..ab163d5
--- /dev/null
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -0,0 +1,107 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
+#define QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
+
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// PublishedObject is a description of an object that is sufficient to publish
+// it on a given track.
+struct PublishedObject {
+  FullSequence sequence;
+  MoqtObjectStatus status;
+  quiche::QuicheMemSlice payload;
+};
+
+// MoqtObjectListener is an interface for any entity that is listening for
+// incoming objects for a given track.
+class MoqtObjectListener {
+ public:
+  virtual ~MoqtObjectListener() = default;
+
+  // Notifies that an object with the given sequence number has become
+  // available.  The object payload itself may be retrieved via GetCachedObject
+  // method of the associated track publisher.
+  virtual void OnNewObjectAvailable(FullSequence sequence) = 0;
+};
+
+// MoqtTrackPublisher is an application-side API for an MoQT publisher
+// of a single individual track.
+class MoqtTrackPublisher {
+ public:
+  virtual ~MoqtTrackPublisher() = default;
+
+  // Returns the full name of the associated track.
+  virtual const FullTrackName& GetTrackName() const = 0;
+
+  // GetCachedObject lets the MoQT stack access the objects that are available
+  // in the track's built-in local cache.
+  //
+  // This implementation of MoQT does not store any objects within the MoQT
+  // stack itself, at least until the object is fully serialized and passed to
+  // the QUIC stack. Instead, it relies on individual tracks having a shared
+  // cache for recent objects, and objects are always pulled from that cache
+  // whenever they are sent.  Once an object is not available via the cache, it
+  // can no longer be sent; this ensures that objects are not buffered forever.
+  //
+  // This method returns nullopt if the object is not currently available, but
+  // might become available in the future.  If the object is gone forever,
+  // kGroupDoesNotExist/kObjectDoesNotExist has to be returned instead;
+  // otherwise, the corresponding QUIC streams will be stuck waiting for objects
+  // that will never arrive.
+  virtual std::optional<PublishedObject> GetCachedObject(
+      FullSequence sequence) const = 0;
+
+  // Returns a full list of objects available in the cache, to be used for
+  // SUBSCRIBEs with a backfill.
+  virtual std::vector<FullSequence> GetCachedObjectsInRange(
+      FullSequence start, FullSequence end) const = 0;
+
+  // TODO: add an API to fetch past objects that are out of cache and might
+  // require an upstream request to fill the relevant cache again. This is
+  // currently done since the specification does not clearly describe how this
+  // is supposed to be done, especially with respect to such things as
+  // backpressure.
+
+  // Registers a listener with the track.  The listener will be notified of all
+  // newly arriving objects. The pointer to the listener must be valid until
+  // removed.
+  virtual void AddObjectListener(MoqtObjectListener* listener) = 0;
+  virtual void RemoveObjectListener(MoqtObjectListener* listener) = 0;
+
+  virtual absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const = 0;
+
+  // Returns the largest sequence pair that has been published so far.
+  // This method may only be called if
+  // DoesTrackStatusImplyHavingData(GetTrackStatus()) is true.
+  virtual FullSequence GetLargestSequence() const = 0;
+
+  // Returns the forwarding preference of the track.
+  // This method may only be called if
+  // DoesTrackStatusImplyHavingData(GetTrackStatus()) is true.
+  virtual MoqtForwardingPreference GetForwardingPreference() const = 0;
+};
+
+// MoqtPublisher is an interface to a publisher that allows it to publish
+// multiple tracks.
+class MoqtPublisher {
+ public:
+  virtual ~MoqtPublisher() = default;
+
+  virtual absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) = 0;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 023bdcc..154c8ba 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -16,6 +16,7 @@
 
 #include "absl/algorithm/container.h"
 #include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
 #include "absl/container/node_hash_map.h"
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
@@ -23,15 +24,18 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/web_transport.h"
 
 #define ENDPOINT \
@@ -39,8 +43,60 @@
 
 namespace moqt {
 
+namespace {
+
 using ::quic::Perspective;
 
+bool PublisherHasData(const MoqtTrackPublisher& publisher) {
+  absl::StatusOr<MoqtTrackStatusCode> status = publisher.GetTrackStatus();
+  return status.ok() && DoesTrackStatusImplyHavingData(*status);
+}
+
+SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe,
+                                         MoqtTrackPublisher& publisher) {
+  const FullSequence sequence = PublisherHasData(publisher)
+                                    ? publisher.GetLargestSequence()
+                                    : FullSequence{0, 0};
+  switch (GetFilterType(subscribe)) {
+    case MoqtFilterType::kLatestGroup:
+      return SubscribeWindow(sequence.group, 0);
+    case MoqtFilterType::kLatestObject:
+      return SubscribeWindow(sequence.group, sequence.object);
+    case MoqtFilterType::kAbsoluteStart:
+      return SubscribeWindow(*subscribe.start_group, *subscribe.start_object);
+    case MoqtFilterType::kAbsoluteRange:
+      return SubscribeWindow(*subscribe.start_group, *subscribe.start_object,
+                             *subscribe.end_group, *subscribe.end_object);
+    case MoqtFilterType::kNone:
+      QUICHE_BUG(MoqtSession_Subscription_invalid_filter_passed);
+      return SubscribeWindow(0, 0);
+  }
+}
+
+class DefaultPublisher : public MoqtPublisher {
+ public:
+  static DefaultPublisher* GetInstance() {
+    static DefaultPublisher* instance = new DefaultPublisher();
+    return instance;
+  }
+
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) override {
+    return absl::NotFoundError("No tracks published");
+  }
+};
+}  // namespace
+
+MoqtSession::MoqtSession(webtransport::Session* session,
+                         MoqtSessionParameters parameters,
+                         MoqtSessionCallbacks callbacks)
+    : session_(session),
+      parameters_(parameters),
+      callbacks_(std::move(callbacks)),
+      framer_(quiche::SimpleBufferAllocator::Get(), parameters.using_webtrans),
+      publisher_(DefaultPublisher::GetInstance()),
+      liveness_token_(std::make_shared<Empty>()) {}
+
 MoqtSession::ControlStream* MoqtSession::GetControlStream() {
   if (!control_stream_.has_value()) {
     return nullptr;
@@ -154,13 +210,6 @@
   std::move(callbacks_.session_terminated_callback)(error);
 }
 
-void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
-                                MoqtForwardingPreference forwarding_preference,
-                                LocalTrack::Visitor* visitor) {
-  local_tracks_.try_emplace(full_track_name, full_track_name,
-                            forwarding_preference, visitor);
-}
-
 // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
 // trigger session errors.
 void MoqtSession::Announce(absl::string_view track_namespace,
@@ -188,23 +237,6 @@
   pending_outgoing_announces_[track_namespace] = std::move(announce_callback);
 }
 
-bool MoqtSession::HasSubscribers(const FullTrackName& full_track_name) const {
-  auto it = local_tracks_.find(full_track_name);
-  return (it != local_tracks_.end() && it->second.HasSubscriber());
-}
-
-void MoqtSession::CancelAnnounce(absl::string_view track_namespace) {
-  for (auto it = local_tracks_.begin(); it != local_tracks_.end(); ++it) {
-    if (it->first.track_namespace == track_namespace) {
-      it->second.set_announce_cancel();
-    }
-  }
-  absl::erase_if(local_tracks_, [&](const auto& it) {
-    return it.first.track_namespace == track_namespace &&
-           !it.second.HasSubscriber();
-  });
-}
-
 bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace,
                                     absl::string_view name,
                                     uint64_t start_group, uint64_t start_object,
@@ -310,33 +342,31 @@
 
 bool MoqtSession::SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                                   absl::string_view reason_phrase) {
-  // Search all the tracks to find the subscribe ID.
-  auto name_it = local_track_by_subscribe_id_.find(subscribe_id);
-  if (name_it == local_track_by_subscribe_id_.end()) {
+  auto it = published_subscriptions_.find(subscribe_id);
+  if (it == published_subscriptions_.end()) {
     return false;
   }
-  auto track_it = local_tracks_.find(name_it->second);
-  if (track_it == local_tracks_.end()) {
-    return false;
-  }
-  LocalTrack& track = track_it->second;
+
+  PublishedSubscription& subscription = *it->second;
+  std::vector<webtransport::StreamId> streams_to_reset =
+      subscription.GetAllStreams();
+
   MoqtSubscribeDone subscribe_done;
   subscribe_done.subscribe_id = subscribe_id;
   subscribe_done.status_code = code;
   subscribe_done.reason_phrase = reason_phrase;
-  SubscribeWindow* window = track.GetWindow(subscribe_id);
-  if (window == nullptr) {
-    return false;
-  }
-  subscribe_done.final_id = window->largest_delivered();
+  subscribe_done.final_id = subscription.largest_sent();
   SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for "
                   << subscribe_id;
   // Clean up the subscription
-  track.DeleteWindow(subscribe_id);
-  local_track_by_subscribe_id_.erase(name_it);
-  if (track.canceled() && !track.HasSubscriber()) {
-    local_tracks_.erase(track_it);
+  published_subscriptions_.erase(it);
+  for (webtransport::StreamId stream_id : streams_to_reset) {
+    webtransport::Stream* stream = session_->GetStreamById(stream_id);
+    if (stream == nullptr) {
+      continue;
+    }
+    stream->ResetWithUserCode(kResetCodeSubscriptionGone);
   }
   return true;
 }
@@ -367,18 +397,51 @@
   return true;
 }
 
-std::optional<webtransport::StreamId> MoqtSession::OpenUnidirectionalStream() {
+webtransport::Stream* MoqtSession::OpenOrQueueDataStream(
+    uint64_t subscription_id, FullSequence first_object) {
   if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
-    return std::nullopt;
+    queued_outgoing_data_streams_.push_back(
+        QueuedOutgoingDataStream{subscription_id, first_object});
+    // TODO: limit the number of streams in the queue.
+    return nullptr;
   }
+  return OpenDataStream(subscription_id, first_object);
+}
+
+webtransport::Stream* MoqtSession::OpenDataStream(uint64_t subscription_id,
+                                                  FullSequence first_object) {
+  auto it = published_subscriptions_.find(subscription_id);
+  if (it == published_subscriptions_.end()) {
+    // It is possible that the subscription has been discarded while the stream
+    // was in the queue; discard those streams.
+    return nullptr;
+  }
+  PublishedSubscription& subscription = *it->second;
+
   webtransport::Stream* new_stream =
       session_->OpenOutgoingUnidirectionalStream();
   if (new_stream == nullptr) {
-    return std::nullopt;
+    QUICHE_BUG(MoqtSession_OpenDataStream_blocked)
+        << "OpenDataStream called when creation of new streams is blocked.";
+    return nullptr;
   }
-  new_stream->SetVisitor(
-      std::make_unique<OutgoingDataStream>(this, new_stream));
-  return new_stream->GetStreamId();
+  new_stream->SetVisitor(std::make_unique<OutgoingDataStream>(
+      this, new_stream, subscription_id, first_object));
+  subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object);
+  return new_stream;
+}
+
+void MoqtSession::OnCanCreateNewOutgoingUnidirectionalStream() {
+  while (!queued_outgoing_data_streams_.empty() &&
+         session_->CanOpenNextOutgoingUnidirectionalStream()) {
+    QueuedOutgoingDataStream next = queued_outgoing_data_streams_.front();
+    queued_outgoing_data_streams_.pop_front();
+    webtransport::Stream* stream =
+        OpenDataStream(next.subscription_id, next.first_object);
+    if (stream != nullptr) {
+      stream->visitor()->OnCanWrite();
+    }
+  }
 }
 
 std::pair<FullTrackName, RemoteTrack::Visitor*>
@@ -422,157 +485,6 @@
        track.visitor()});
 }
 
-// TODO(martinduke): Throw errors if the object status is inconsistent with
-// sequence numbers we have already observed on the track.
-bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
-                                uint64_t group_id, uint64_t object_id,
-                                uint64_t object_send_order,
-                                MoqtObjectStatus status,
-                                absl::string_view payload) {
-  auto track_it = local_tracks_.find(full_track_name);
-  if (track_it == local_tracks_.end()) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return false;
-  }
-  // TODO(martinduke): Write a test for this QUIC_BUG.
-  QUIC_BUG_IF(moqt_publish_abnormal_with_payload,
-              status != MoqtObjectStatus::kNormal && !payload.empty());
-  LocalTrack& track = track_it->second;
-  bool end_of_stream = false;
-  MoqtForwardingPreference forwarding_preference =
-      track.forwarding_preference();
-  switch (forwarding_preference) {
-    case MoqtForwardingPreference::kTrack:
-      end_of_stream = (status == MoqtObjectStatus::kEndOfTrack);
-      break;
-    case MoqtForwardingPreference::kObject:
-    case MoqtForwardingPreference::kDatagram:
-      end_of_stream = true;
-      break;
-    case MoqtForwardingPreference::kGroup:
-      end_of_stream = (status == MoqtObjectStatus::kEndOfGroup ||
-                       status == MoqtObjectStatus::kGroupDoesNotExist ||
-                       status == MoqtObjectStatus::kEndOfTrack);
-      break;
-  }
-  FullSequence sequence{group_id, object_id};
-  track.SentSequence(sequence, status);
-  std::vector<SubscribeWindow*> subscriptions =
-      track.ShouldSend({group_id, object_id});
-  if (subscriptions.empty()) {
-    return true;
-  }
-  MoqtObject object;
-  QUICHE_DCHECK(track.track_alias().has_value());
-  object.track_alias = *track.track_alias();
-  object.group_id = group_id;
-  object.object_id = object_id;
-  object.object_send_order = object_send_order;
-  object.object_status = status;
-  object.forwarding_preference = forwarding_preference;
-  object.payload_length = payload.size();
-  int failures = 0;
-  quiche::StreamWriteOptions write_options;
-  write_options.set_send_fin(end_of_stream);
-  absl::flat_hash_set<uint64_t> subscribes_to_close;
-  for (auto subscription : subscriptions) {
-    if (subscription->OnObjectSent(sequence, status)) {
-      subscribes_to_close.insert(subscription->subscribe_id());
-    }
-    if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
-      object.subscribe_id = subscription->subscribe_id();
-      quiche::QuicheBuffer datagram =
-          framer_.SerializeObjectDatagram(object, payload);
-      // TODO(martinduke): It's OK to just silently fail, but better to notify
-      // the app on errors.
-      session_->SendOrQueueDatagram(datagram.AsStringView());
-      continue;
-    }
-    bool new_stream = false;
-    std::optional<webtransport::StreamId> stream_id =
-        subscription->GetStreamForSequence(sequence);
-    if (!stream_id.has_value()) {
-      new_stream = true;
-      stream_id = OpenUnidirectionalStream();
-      if (!stream_id.has_value()) {
-        QUICHE_DLOG(ERROR) << ENDPOINT
-                           << "Sending OBJECT to nonexistent stream";
-        ++failures;
-        continue;
-      }
-      if (!end_of_stream) {
-        subscription->AddStream(group_id, object_id, *stream_id);
-      }
-    }
-    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
-    if (stream == nullptr) {
-      QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream "
-                         << *stream_id;
-      ++failures;
-      continue;
-    }
-    object.subscribe_id = subscription->subscribe_id();
-    quiche::QuicheBuffer header =
-        framer_.SerializeObjectHeader(object, new_stream);
-    std::array<absl::string_view, 2> views = {header.AsStringView(), payload};
-    if (!stream->Writev(views, write_options).ok()) {
-      QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
-      ++failures;
-      continue;
-    }
-    QUICHE_DVLOG(1) << ENDPOINT << "Sending object length " << payload.length()
-                    << " for " << full_track_name.track_namespace << ":"
-                    << full_track_name.track_name << " with sequence "
-                    << object.group_id << ":" << object.object_id
-                    << " on stream " << *stream_id;
-    if (end_of_stream && !new_stream) {
-      subscription->RemoveStream(group_id, object_id);
-    }
-  }
-  for (uint64_t subscribe_id : subscribes_to_close) {
-    SubscribeIsDone(subscribe_id, SubscribeDoneCode::kSubscriptionEnded, "");
-  }
-  return (failures == 0);
-}
-
-void MoqtSession::CloseObjectStream(const FullTrackName& full_track_name,
-                                    uint64_t group_id) {
-  auto track_it = local_tracks_.find(full_track_name);
-  if (track_it == local_tracks_.end()) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return;
-  }
-  LocalTrack& track = track_it->second;
-
-  MoqtForwardingPreference forwarding_preference =
-      track.forwarding_preference();
-  if (forwarding_preference == MoqtForwardingPreference::kObject ||
-      forwarding_preference == MoqtForwardingPreference::kDatagram) {
-    QUIC_BUG(MoqtSession_CloseStreamObject_wrong_type)
-        << "Forwarding preferences of Object or Datagram require stream to be "
-           "immediately closed, and thus are not valid CloseObjectStream() "
-           "targets";
-    return;
-  }
-
-  std::vector<SubscribeWindow*> subscriptions =
-      track.ShouldSend({group_id, /*object=*/0});
-  for (SubscribeWindow* subscription : subscriptions) {
-    std::optional<webtransport::StreamId> stream_id =
-        subscription->GetStreamForSequence(
-            FullSequence(group_id, /*object=*/0));
-    if (!stream_id.has_value()) {
-      continue;
-    }
-    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
-    if (stream == nullptr) {
-      continue;
-    }
-    bool success = stream->SendFin();
-    QUICHE_BUG_IF(MoqtSession_CloseObjectStream_fin_failed, !success);
-  }
-}
-
 static void ForwardStreamDataToParser(webtransport::Stream& stream,
                                       MoqtParser& parser) {
   bool fin =
@@ -673,7 +585,6 @@
 
 void MoqtSession::ControlStream::OnSubscribeMessage(
     const MoqtSubscribe& message) {
-  std::string reason_phrase = "";
   if (session_->peer_role_ == MoqtRole::kPublisher) {
     QUIC_DLOG(INFO) << ENDPOINT << "Publisher peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
@@ -682,99 +593,40 @@
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.track_namespace << ":" << message.track_name;
-  auto it = session_->local_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->local_tracks_.end()) {
-    QUIC_DLOG(INFO) << ENDPOINT << "Rejected because "
-                    << message.track_namespace << ":" << message.track_name
-                    << " does not exist";
+
+  FullTrackName track_name =
+      FullTrackName{message.track_namespace, message.track_name};
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher =
+      session_->publisher_->GetTrack(track_name);
+  if (!track_publisher.ok()) {
+    QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
+                    << " rejected by the application: "
+                    << track_publisher.status();
     SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                       "Track does not exist", message.track_alias);
+                       track_publisher.status().message(), message.track_alias);
     return;
   }
-  LocalTrack& track = it->second;
-  if (it->second.canceled()) {
-    // Note that if the track has already been deleted, there will not be a
-    // protocol violation, which the spec says there SHOULD be. It's not worth
-    // keeping state on deleted tracks.
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE for canceled track");
-    return;
+  std::optional<FullSequence> largest_id;
+  if (PublisherHasData(**track_publisher)) {
+    largest_id = (*track_publisher)->GetLargestSequence();
   }
-  if ((track.track_alias().has_value() &&
-       message.track_alias != *track.track_alias()) ||
-      session_->used_track_aliases_.contains(message.track_alias)) {
-    // Propose a different track_alias.
-    SendSubscribeError(message, SubscribeErrorCode::kRetryTrackAlias,
-                       "Track alias already exists",
-                       session_->next_local_track_alias_++);
-    return;
-  } else {  // Use client-provided alias.
-    track.set_track_alias(message.track_alias);
-    if (message.track_alias >= session_->next_local_track_alias_) {
-      session_->next_local_track_alias_ = message.track_alias + 1;
-    }
-    session_->used_track_aliases_.insert(message.track_alias);
+
+  auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
+      session_, *std::move(track_publisher), message);
+  auto [it, success] = session_->published_subscriptions_.emplace(
+      message.subscribe_id, std::move(subscription));
+  if (!success) {
+    SendSubscribeError(message, SubscribeErrorCode::kInternalError,
+                       "Duplicate subscribe ID", message.track_alias);
   }
-  FullSequence start;
-  if (message.start_group.has_value()) {
-    // The filter is AbsoluteStart or AbsoluteRange.
-    QUIC_BUG_IF(quic_bug_invalid_subscribe, !message.start_object.has_value())
-        << "Start group without start object";
-    start = FullSequence(*message.start_group, *message.start_object);
-  } else {
-    // The filter is LatestObject or LatestGroup.
-    start = track.next_sequence();
-    if (message.start_object.has_value()) {
-      // The filter is LatestGroup.
-      QUIC_BUG_IF(quic_bug_invalid_subscribe, *message.start_object != 0)
-          << "LatestGroup does not start with zero";
-      start.object = 0;
-    } else {
-      --start.object;
-    }
-  }
-  LocalTrack::Visitor::PublishPastObjectsCallback publish_past_objects;
-  std::optional<SubscribeWindow> past_window;
-  if (start < track.next_sequence() && track.visitor() != nullptr) {
-    // Pull a copy of objects that have already been published.
-    FullSequence end_of_past_subscription{
-        message.end_group.has_value() ? *message.end_group : UINT64_MAX,
-        message.end_object.has_value() ? *message.end_object : UINT64_MAX};
-    end_of_past_subscription =
-        std::min(end_of_past_subscription, track.next_sequence());
-    past_window.emplace(message.subscribe_id, track.forwarding_preference(),
-                        track.next_sequence(), start, end_of_past_subscription);
-    absl::StatusOr<LocalTrack::Visitor::PublishPastObjectsCallback>
-        past_objects_available =
-            track.visitor()->OnSubscribeForPast(*past_window);
-    if (!past_objects_available.ok()) {
-      SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                         past_objects_available.status().message(),
-                         message.track_alias);
-      return;
-    }
-    publish_past_objects = *std::move(past_objects_available);
-  }
+
   MoqtSubscribeOk subscribe_ok;
   subscribe_ok.subscribe_id = message.subscribe_id;
+  subscribe_ok.largest_id = largest_id;
   SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
-  QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
-                  << message.track_namespace << ":" << message.track_name;
-  if (!message.end_group.has_value()) {
-    track.AddWindow(message.subscribe_id, start.group, start.object);
-  } else if (message.end_object.has_value()) {
-    track.AddWindow(message.subscribe_id, start.group, start.object,
-                    *message.end_group, *message.end_object);
-  } else {
-    track.AddWindow(message.subscribe_id, start.group, start.object,
-                    *message.end_group);
-  }
-  session_->local_track_by_subscribe_id_.emplace(message.subscribe_id,
-                                                 track.full_track_name());
-  if (publish_past_objects) {
-    QUICHE_DCHECK(past_window.has_value());
-    std::move(publish_past_objects)();
+
+  if (largest_id.has_value()) {
+    it->second->Backfill();
   }
 }
 
@@ -850,19 +702,8 @@
 
 void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
-  // Search all the tracks to find the subscribe ID.
-  auto name_it =
-      session_->local_track_by_subscribe_id_.find(message.subscribe_id);
-  if (name_it == session_->local_track_by_subscribe_id_.end()) {
-    return;
-  }
-  auto track_it = session_->local_tracks_.find(name_it->second);
-  if (track_it == session_->local_tracks_.end()) {
-    return;
-  }
-  LocalTrack& track = track_it->second;
-  SubscribeWindow* window = track.GetWindow(message.subscribe_id);
-  if (window == nullptr) {
+  auto it = session_->published_subscriptions_.find(message.subscribe_id);
+  if (it == session_->published_subscriptions_.end()) {
     return;
   }
   FullSequence start(message.start_group, message.start_object);
@@ -872,15 +713,7 @@
                                                ? *message.end_object
                                                : UINT64_MAX);
   }
-  // TODO(martinduke): Handle the case where the update range is invalid.
-  if (window->UpdateStartEnd(start, end)) {
-    std::optional<FullSequence> largest_delivered = window->largest_delivered();
-    if (largest_delivered.has_value() && end <= *largest_delivered) {
-      session_->SubscribeIsDone(message.subscribe_id,
-                                SubscribeDoneCode::kSubscriptionEnded,
-                                "SUBSCRIBE_UPDATE moved subscription end");
-    }
-  }
+  it->second->Update(start, end);
 }
 
 void MoqtSession::ControlStream::OnAnnounceMessage(
@@ -935,7 +768,7 @@
 
 void MoqtSession::ControlStream::OnAnnounceCancelMessage(
     const MoqtAnnounceCancel& message) {
-  session_->CancelAnnounce(message.track_namespace);
+  // TODO: notify the application about this.
 }
 
 void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
@@ -1007,8 +840,309 @@
   session_->Error(error_code, absl::StrCat("Parse error: ", reason));
 }
 
+MoqtSession::PublishedSubscription::PublishedSubscription(
+    MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher,
+    const MoqtSubscribe& subscribe)
+    : subscription_id_(subscribe.subscribe_id),
+      session_(session),
+      track_publisher_(track_publisher),
+      track_alias_(subscribe.track_alias),
+      window_(SubscribeMessageToWindow(subscribe, *track_publisher)) {
+  track_publisher->AddObjectListener(this);
+  QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
+                  << subscribe.track_namespace << ":" << subscribe.track_name;
+}
+
+MoqtSession::PublishedSubscription::~PublishedSubscription() {
+  track_publisher_->RemoveObjectListener(this);
+}
+
+SendStreamMap& MoqtSession::PublishedSubscription::stream_map() {
+  // The stream map is lazily initialized, since initializing it requires
+  // knowing the forwarding preference in advance, and it might not be known
+  // when the subscription is first created.
+  if (!lazily_initialized_stream_map_.has_value()) {
+    QUICHE_DCHECK(
+        DoesTrackStatusImplyHavingData(*track_publisher_->GetTrackStatus()));
+    lazily_initialized_stream_map_.emplace(
+        track_publisher_->GetForwardingPreference());
+  }
+  return *lazily_initialized_stream_map_;
+}
+
+void MoqtSession::PublishedSubscription::Update(
+    FullSequence start, std::optional<FullSequence> end) {
+  window_.UpdateStartEnd(start, end);
+  // TODO: reset streams that are no longer in-window.
+  // TODO: send SUBSCRIBE_DONE if required.
+  // TODO: send an error for invalid updates now that it's a part of draft-05.
+}
+
+void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
+    FullSequence sequence) {
+  if (!window_.InWindow(sequence)) {
+    return;
+  }
+
+  MoqtForwardingPreference forwarding_preference =
+      track_publisher_->GetForwardingPreference();
+  if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
+    SendDatagram(sequence);
+    return;
+  }
+
+  std::optional<webtransport::StreamId> stream_id =
+      stream_map().GetStreamForSequence(sequence);
+  webtransport::Stream* raw_stream = nullptr;
+  if (stream_id.has_value()) {
+    raw_stream = session_->session_->GetStreamById(*stream_id);
+  } else {
+    if (!window_.IsStreamProvokingObject(sequence, forwarding_preference)) {
+      QUIC_DLOG(INFO) << ENDPOINT << "Received object " << sequence
+                      << ", but there is no stream that it can be mapped to";
+      // It is possible that the we are getting notified of objects out of
+      // order, but we still have to send objects in a manner consistent with
+      // the forwarding preference used.
+      return;
+    }
+    raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
+  }
+  if (raw_stream == nullptr) {
+    return;
+  }
+
+  OutgoingDataStream* stream =
+      static_cast<OutgoingDataStream*>(raw_stream->visitor());
+  stream->SendObjects(*this);
+}
+
+void MoqtSession::PublishedSubscription::Backfill() {
+  const FullSequence start = window_.start();
+  const FullSequence end = track_publisher_->GetLargestSequence();
+  const MoqtForwardingPreference preference =
+      track_publisher_->GetForwardingPreference();
+
+  absl::flat_hash_set<ReducedSequenceIndex> already_opened;
+  std::vector<FullSequence> objects =
+      track_publisher_->GetCachedObjectsInRange(start, end);
+  QUICHE_DCHECK(absl::c_is_sorted(objects));
+  for (FullSequence sequence : objects) {
+    auto [it, was_missing] =
+        already_opened.insert(ReducedSequenceIndex(sequence, preference));
+    if (!was_missing) {
+      // For every stream mapping unit present, we only need to notify of the
+      // earliest object on it, since the stream itself will pull the rest.
+      continue;
+    }
+    OnNewObjectAvailable(sequence);
+  }
+}
+
+std::vector<webtransport::StreamId>
+MoqtSession::PublishedSubscription::GetAllStreams() const {
+  if (!lazily_initialized_stream_map_.has_value()) {
+    return {};
+  }
+  return lazily_initialized_stream_map_->GetAllStreams();
+}
+
+void MoqtSession::PublishedSubscription::OnDataStreamCreated(
+    webtransport::StreamId id, FullSequence start_sequence) {
+  stream_map().AddStream(start_sequence, id);
+}
+void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
+    webtransport::StreamId id, FullSequence end_sequence) {
+  stream_map().RemoveStream(end_sequence, id);
+}
+
+void MoqtSession::PublishedSubscription::OnObjectSent(FullSequence sequence) {
+  if (largest_sent_.has_value()) {
+    largest_sent_ = std::max(*largest_sent_, sequence);
+  } else {
+    largest_sent_ = sequence;
+  }
+  // TODO: send SUBSCRIBE_DONE if the subscription is done.
+}
+
+MoqtSession::OutgoingDataStream::OutgoingDataStream(
+    MoqtSession* session, webtransport::Stream* stream,
+    uint64_t subscription_id, FullSequence first_object)
+    : session_(session),
+      stream_(stream),
+      subscription_id_(subscription_id),
+      next_object_(first_object),
+      session_liveness_(session->liveness_token_) {}
+
+MoqtSession::OutgoingDataStream::~OutgoingDataStream() {
+  // Though it might seem intuitive that the session object has to outlive the
+  // connection object (and this is indeed how something like QuicSession and
+  // QuicStream works), this is not the true for WebTransport visitors: the
+  // session getting destroyed will inevitably lead to all related streams being
+  // destroyed, but the actual order of destruction is not guaranteed.  Thus, we
+  // need to check if the session still exists while accessing it in a stream
+  // destructor.
+  if (session_liveness_.expired()) {
+    return;
+  }
+  auto it = session_->published_subscriptions_.find(subscription_id_);
+  if (it != session_->published_subscriptions_.end()) {
+    it->second->OnDataStreamDestroyed(stream_->GetStreamId(), next_object_);
+  }
+}
+
 void MoqtSession::OutgoingDataStream::OnCanWrite() {
-  // TODO: handle backpressure on data streams.
+  PublishedSubscription* subscription = GetSubscriptionIfValid();
+  if (subscription == nullptr) {
+    return;
+  }
+  SendObjects(*subscription);
+}
+
+MoqtSession::PublishedSubscription*
+MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() {
+  auto it = session_->published_subscriptions_.find(subscription_id_);
+  if (it == session_->published_subscriptions_.end()) {
+    stream_->ResetWithUserCode(kResetCodeSubscriptionGone);
+    return nullptr;
+  }
+
+  PublishedSubscription* subscription = it->second.get();
+  MoqtTrackPublisher& publisher = subscription->publisher();
+  absl::StatusOr<MoqtTrackStatusCode> status = publisher.GetTrackStatus();
+  if (!status.ok()) {
+    // TODO: clean up the subscription.
+    return nullptr;
+  }
+  if (!DoesTrackStatusImplyHavingData(*status)) {
+    QUICHE_BUG(GetSubscriptionIfValid_InvalidTrackStatus)
+        << "The track publisher returned a status indicating that no objects "
+           "are available, but a stream for those objects exists.";
+    session_->Error(MoqtError::kInternalError,
+                    "Invalid track state provided by application");
+    return nullptr;
+  }
+  return subscription;
+}
+
+void MoqtSession::OutgoingDataStream::SendObjects(
+    PublishedSubscription& subscription) {
+  while (stream_->CanWrite()) {
+    std::optional<PublishedObject> object =
+        subscription.publisher().GetCachedObject(next_object_);
+    if (!object.has_value()) {
+      break;
+    }
+    if (!subscription.InWindow(next_object_)) {
+      // It is possible that the next object became irrelevant due to a
+      // SUBSCRIBE_UPDATE.  Close the stream if so.
+      bool success = stream_->SendFin();
+      QUICHE_BUG_IF(OutgoingDataStream_fin_due_to_update, !success)
+          << "Writing FIN failed despite CanWrite() being true.";
+      return;
+    }
+    SendNextObject(subscription, *std::move(object));
+  }
+}
+
+void MoqtSession::OutgoingDataStream::SendNextObject(
+    PublishedSubscription& subscription, PublishedObject object) {
+  QUICHE_DCHECK(object.sequence == next_object_);
+  QUICHE_DCHECK(stream_->CanWrite());
+
+  MoqtTrackPublisher& publisher = subscription.publisher();
+  QUICHE_DCHECK(DoesTrackStatusImplyHavingData(*publisher.GetTrackStatus()));
+  MoqtForwardingPreference forwarding_preference =
+      publisher.GetForwardingPreference();
+
+  MoqtObject header;
+  header.subscribe_id = subscription_id_;
+  header.track_alias = subscription.track_alias();
+  header.group_id = object.sequence.group;
+  header.object_id = object.sequence.object;
+  header.object_send_order = 0;  // TODO: remove in draft-05
+  header.object_status = object.status;
+  header.forwarding_preference = forwarding_preference;
+  header.payload_length = object.payload.length();
+
+  quiche::QuicheBuffer serialized_header =
+      session_->framer_.SerializeObjectHeader(header, !stream_header_written_);
+  bool fin = false;
+  switch (forwarding_preference) {
+    case MoqtForwardingPreference::kTrack:
+      if (object.status == MoqtObjectStatus::kEndOfGroup ||
+          object.status == MoqtObjectStatus::kGroupDoesNotExist) {
+        ++next_object_.group;
+        next_object_.object = 0;
+      } else {
+        ++next_object_.object;
+      }
+      fin = object.status == MoqtObjectStatus::kEndOfTrack ||
+            !subscription.InWindow(next_object_);
+      break;
+
+    case MoqtForwardingPreference::kGroup:
+      ++next_object_.object;
+      fin = object.status == MoqtObjectStatus::kEndOfTrack ||
+            object.status == MoqtObjectStatus::kEndOfGroup ||
+            object.status == MoqtObjectStatus::kGroupDoesNotExist ||
+            !subscription.InWindow(next_object_);
+      break;
+
+    case MoqtForwardingPreference::kObject:
+      QUICHE_DCHECK(!stream_header_written_);
+      fin = true;
+      break;
+
+    case MoqtForwardingPreference::kDatagram:
+      QUICHE_NOTREACHED();
+      break;
+  }
+
+  // TODO(vasilvv): add a version of WebTransport write API that accepts
+  // memslices so that we can avoid a copy here.
+  std::array<absl::string_view, 2> write_vector = {
+      serialized_header.AsStringView(), object.payload.AsStringView()};
+  quiche::StreamWriteOptions options;
+  options.set_send_fin(fin);
+  absl::Status write_status = stream_->Writev(write_vector, options);
+  if (!write_status.ok()) {
+    QUICHE_BUG(MoqtSession_SendNextObject_write_failed)
+        << "Writing into MoQT stream failed despite CanWrite() being true "
+           "before; status: "
+        << write_status;
+    session_->Error(MoqtError::kInternalError, "Data stream write error");
+    return;
+  }
+
+  QUIC_DVLOG(1) << "Stream " << stream_->GetStreamId() << " successfully wrote "
+                << object.sequence << ", fin = " << fin
+                << ", next: " << next_object_;
+
+  stream_header_written_ = true;
+  subscription.OnObjectSent(object.sequence);
+}
+
+void MoqtSession::PublishedSubscription::SendDatagram(FullSequence sequence) {
+  std::optional<PublishedObject> object =
+      track_publisher_->GetCachedObject(sequence);
+  if (!object.has_value()) {
+    QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache)
+        << "Got notification about an object that is not in the cache";
+    return;
+  }
+
+  MoqtObject header;
+  header.subscribe_id = subscription_id_;
+  header.track_alias = track_alias();
+  header.group_id = object->sequence.group;
+  header.object_id = object->sequence.object;
+  header.object_send_order = 0;  // TODO: remove in draft-05
+  header.object_status = object->status;
+  header.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
+      header, object->payload.AsStringView());
+  session_->session_->SendOrQueueDatagram(datagram.AsStringView());
+  OnObjectSent(object->sequence);
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index b4a0171..52af69d 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -6,9 +6,11 @@
 #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
 
 #include <cstdint>
+#include <memory>
 #include <optional>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
@@ -17,11 +19,13 @@
 #include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/common/quiche_circular_deque.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -63,12 +67,7 @@
 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
  public:
   MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
-              MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
-      : session_(session),
-        parameters_(parameters),
-        callbacks_(std::move(callbacks)),
-        framer_(quiche::SimpleBufferAllocator::Get(),
-                parameters.using_webtrans) {}
+              MoqtSessionCallbacks callbacks = MoqtSessionCallbacks());
   ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }
 
   // webtransport::SessionVisitor implementation.
@@ -79,28 +78,17 @@
   void OnIncomingUnidirectionalStreamAvailable() override;
   void OnDatagramReceived(absl::string_view datagram) override;
   void OnCanCreateNewOutgoingBidirectionalStream() override {}
-  void OnCanCreateNewOutgoingUnidirectionalStream() override {}
+  void OnCanCreateNewOutgoingUnidirectionalStream() override;
 
   void Error(MoqtError code, absl::string_view error);
 
   quic::Perspective perspective() const { return parameters_.perspective; }
 
-  // Add to the list of tracks that can be subscribed to. Call this before
-  // Announce() so that subscriptions can be processed correctly. If |visitor|
-  // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
-  // SUBSCRIBE_OK, but never actually get the objects.
-  void AddLocalTrack(const FullTrackName& full_track_name,
-                     MoqtForwardingPreference forwarding_preference,
-                     LocalTrack::Visitor* visitor);
   // Send an ANNOUNCE message for |track_namespace|, and call
   // |announce_callback| when the response arrives. Will fail immediately if
   // there is already an unresolved ANNOUNCE for that namespace.
   void Announce(absl::string_view track_namespace,
                 MoqtOutgoingAnnounceCallback announce_callback);
-  bool HasSubscribers(const FullTrackName& full_track_name) const;
-  // Send an ANNOUNCE_CANCEL and delete local tracks in that namespace when all
-  // subscriptions are closed for that track.
-  void CancelAnnounce(absl::string_view track_namespace);
 
   // Returns true if SUBSCRIBE was sent. If there is already a subscription to
   // the track, the message will still be sent. However, the visitor will be
@@ -131,21 +119,9 @@
                              RemoteTrack::Visitor* visitor,
                              absl::string_view auth_info = "");
 
-  // Returns false if it could not open a stream when necessary, or if the
-  // track does not exist (there was no call to AddLocalTrack). Will still
-  // return false is some streams succeed.
-  // Also returns false if |payload_length| exists but is shorter than
-  // |payload|.
-  // |payload.length() >= |payload_length|, because the application can deliver
-  // partial objects.
-  bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
-                     uint64_t object_id, uint64_t object_send_order,
-                     MoqtObjectStatus status, absl::string_view payload);
-  void CloseObjectStream(const FullTrackName& full_track_name,
-                         uint64_t group_id);
-  // TODO: Add an API to send partial objects.
-
   MoqtSessionCallbacks& callbacks() { return callbacks_; }
+  MoqtPublisher* publisher() { return publisher_; }
+  void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; }
 
  private:
   friend class test::MoqtSessionPeer;
@@ -298,10 +274,70 @@
     MoqtParser parser_;
     std::string partial_object_;
   };
+  // Represents a record for a single subscription to a local track that is
+  // being sent to the peer.
+  class PublishedSubscription : public MoqtObjectListener {
+   public:
+    explicit PublishedSubscription(
+        MoqtSession* session,
+        std::shared_ptr<MoqtTrackPublisher> track_publisher,
+        const MoqtSubscribe& subscribe);
+    ~PublishedSubscription();
+
+    PublishedSubscription(const PublishedSubscription&) = delete;
+    PublishedSubscription(PublishedSubscription&&) = delete;
+    PublishedSubscription& operator=(const PublishedSubscription&) = delete;
+    PublishedSubscription& operator=(PublishedSubscription&&) = delete;
+
+    MoqtTrackPublisher& publisher() { return *track_publisher_; }
+    uint64_t track_alias() const { return track_alias_; }
+    std::optional<FullSequence> largest_sent() const { return largest_sent_; }
+
+    void OnNewObjectAvailable(FullSequence sequence) override;
+
+    // Creates streams for all objects that are currently in the track's object
+    // cache and match the subscription window.  This is in some sense similar
+    // to a fetch (since all of the objects are in the past), but is
+    // conceptually simpler, as backpressure is less of a concern.
+    void Backfill();
+
+    // Updates the window of the subscription in question.
+    void Update(FullSequence start, std::optional<FullSequence> end);
+    // Checks if the specified sequence is within the window of this
+    // subscription.
+    bool InWindow(FullSequence sequence) { return window_.InWindow(sequence); }
+
+    void OnDataStreamCreated(webtransport::StreamId id,
+                             FullSequence start_sequence);
+    void OnDataStreamDestroyed(webtransport::StreamId id,
+                               FullSequence end_sequence);
+    void OnObjectSent(FullSequence sequence);
+
+    std::vector<webtransport::StreamId> GetAllStreams() const;
+
+   private:
+    SendStreamMap& stream_map();
+    quic::Perspective perspective() const {
+      return session_->parameters_.perspective;
+    }
+
+    void SendDatagram(FullSequence sequence);
+
+    uint64_t subscription_id_;
+    MoqtSession* session_;
+    std::shared_ptr<MoqtTrackPublisher> track_publisher_;
+    uint64_t track_alias_;
+    SubscribeWindow window_;
+    // Largest sequence number ever sent via this subscription.
+    std::optional<FullSequence> largest_sent_;
+    // Should be almost always accessed via `stream_map()`.
+    std::optional<SendStreamMap> lazily_initialized_stream_map_;
+  };
   class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
    public:
-    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream)
-        : session_(session), stream_(stream) {}
+    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
+                       uint64_t subscription_id, FullSequence first_object);
+    ~OutgoingDataStream();
 
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override {}
@@ -312,11 +348,37 @@
 
     webtransport::Stream* stream() const { return stream_; }
 
+    // Sends objects on the stream, starting with `next_object_`, until the
+    // stream becomes write-blocked or closed.
+    void SendObjects(PublishedSubscription& subscription);
+
    private:
     friend class test::MoqtSessionPeer;
 
+    // Checks whether the associated subscription is still valid; if not, resets
+    // the stream and returns nullptr.
+    PublishedSubscription* GetSubscriptionIfValid();
+
+    // Actually sends an object on the stream; the object MUST be
+    // `next_object_`.
+    void SendNextObject(PublishedSubscription& subscription,
+                        PublishedObject object);
+
     MoqtSession* session_;
     webtransport::Stream* stream_;
+    uint64_t subscription_id_;
+    FullSequence next_object_;
+    bool stream_header_written_ = false;
+    // A weak pointer to an object owned by the session.  Used to make sure the
+    // session does not get called after being destroyed.
+    std::weak_ptr<void> session_liveness_;
+  };
+  // QueuedOutgoingDataStream records an information necessary to create a
+  // stream that was attempted to be created before but was blocked due to flow
+  // control.
+  struct QueuedOutgoingDataStream {
+    uint64_t subscription_id;
+    FullSequence first_object;
   };
 
   // Returns true if SUBSCRIBE_DONE was sent.
@@ -331,9 +393,14 @@
   // Returns false if the SUBSCRIBE isn't sent.
   bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
 
-  // Returns the stream ID if successful, nullopt if not.
-  // TODO: Add a callback if stream creation is delayed.
-  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
+  // Opens a new data stream, or queues it if the session is flow control
+  // blocked.
+  webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id,
+                                              FullSequence first_object);
+  // Same as above, except the session is required to be not flow control
+  // blocked.
+  webtransport::Stream* OpenDataStream(uint64_t subscription_id,
+                                       FullSequence first_object);
 
   // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
   // nullptr if not present.
@@ -356,9 +423,16 @@
   absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
   uint64_t next_remote_track_alias_ = 0;
 
-  // All the tracks the peer can subscribe to.
-  absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
-  absl::flat_hash_map<uint64_t, FullTrackName> local_track_by_subscribe_id_;
+  // Application object representing the publisher for all of the tracks that
+  // can be subscribed to via this connection.  Must outlive this object.
+  MoqtPublisher* publisher_;
+  // Subscriptions for local tracks by the remote peer, indexed by subscribe ID.
+  absl::flat_hash_map<uint64_t, std::unique_ptr<PublishedSubscription>>
+      published_subscriptions_;
+  // Keeps track of all the data streams that were supposed to be open, but were
+  // blocked by the flow control.
+  quiche::QuicheCircularDeque<QueuedOutgoingDataStream>
+      queued_outgoing_data_streams_;
   // This is only used to check for track_alias collisions.
   absl::flat_hash_set<uint64_t> used_track_aliases_;
   uint64_t next_local_track_alias_ = 0;
@@ -386,6 +460,11 @@
   // an uninitialized value if no SETUP arrives or it arrives with no Role
   // parameter, and other checks have changed/been disabled.
   MoqtRole peer_role_ = MoqtRole::kPubSub;
+
+  // Must be last.  Token used to make sure that the streams do not call into
+  // the session when the session has already been destroyed.
+  struct Empty {};
+  std::shared_ptr<Empty> liveness_token_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index f8e64e8..1c8162b 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -12,16 +12,20 @@
 #include <utility>
 
 #include "absl/status/status.h"
+#include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_data_reader.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
@@ -32,6 +36,7 @@
 
 namespace {
 
+using ::quic::test::MemSliceFromString;
 using ::testing::_;
 using ::testing::AnyNumber;
 using ::testing::Return;
@@ -60,6 +65,19 @@
   return static_cast<MoqtMessageType>(value);
 }
 
+static std::shared_ptr<MockTrackPublisher> SetupPublisher(
+    FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
+    FullSequence largest_sequence) {
+  auto publisher = std::make_shared<MockTrackPublisher>(std::move(track_name));
+  ON_CALL(*publisher, GetTrackStatus())
+      .WillByDefault(Return(MoqtTrackStatusCode::kInProgress));
+  ON_CALL(*publisher, GetForwardingPreference())
+      .WillByDefault(Return(forwarding_preference));
+  ON_CALL(*publisher, GetLargestSequence())
+      .WillByDefault(Return(largest_sequence));
+  return publisher;
+}
+
 }  // namespace
 
 class MoqtSessionPeer {
@@ -108,30 +126,21 @@
     session->active_subscribes_[subscribe_id] = {subscribe, visitor};
   }
 
-  static LocalTrack* local_track(MoqtSession* session, FullTrackName& name) {
-    auto it = session->local_tracks_.find(name);
-    if (it == session->local_tracks_.end()) {
-      return nullptr;
-    }
-    return &it->second;
-  }
-
-  static void AddSubscription(MoqtSession* session, FullTrackName& name,
-                              uint64_t subscribe_id, uint64_t track_alias,
-                              uint64_t start_group, uint64_t start_object) {
-    LocalTrack* track = local_track(session, name);
-    track->set_track_alias(track_alias);
-    track->AddWindow(subscribe_id, start_group, start_object);
-    session->used_track_aliases_.emplace(track_alias);
-    session->local_track_by_subscribe_id_.emplace(subscribe_id,
-                                                  track->full_track_name());
-  }
-
-  static FullSequence next_sequence(MoqtSession* session, FullTrackName& name) {
-    auto it = session->local_tracks_.find(name);
-    EXPECT_NE(it, session->local_tracks_.end());
-    LocalTrack& track = it->second;
-    return track.next_sequence();
+  static MoqtObjectListener* AddSubscription(
+      MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> publisher,
+      uint64_t subscribe_id, uint64_t track_alias, uint64_t start_group,
+      uint64_t start_object) {
+    MoqtSubscribe subscribe;
+    subscribe.track_namespace = publisher->GetTrackName().track_namespace;
+    subscribe.track_name = publisher->GetTrackName().track_name;
+    subscribe.track_alias = track_alias;
+    subscribe.subscribe_id = subscribe_id;
+    subscribe.start_group = start_group;
+    subscribe.start_object = start_object;
+    session->published_subscriptions_.emplace(
+        subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
+                          session, std::move(publisher), subscribe));
+    return session->published_subscriptions_[subscribe_id].get();
   }
 
   static void set_peer_role(MoqtSession* session, MoqtRole role) {
@@ -147,7 +156,9 @@
  public:
   MoqtSessionTest()
       : session_(&mock_session_, default_parameters,
-                 session_callbacks_.AsSessionCallbacks()) {}
+                 session_callbacks_.AsSessionCallbacks()) {
+    session_.set_publisher(&publisher_);
+  }
   ~MoqtSessionTest() {
     EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
   }
@@ -155,6 +166,7 @@
   MockSessionCallbacks session_callbacks_;
   StrictMock<webtransport::test::MockSession> mock_session_;
   MoqtSession session_;
+  MoqtKnownTrackPublisher publisher_;
 };
 
 TEST_F(MoqtSessionTest, Queries) {
@@ -314,10 +326,11 @@
   EXPECT_TRUE(correct_message);
 
   // Add the track. Now Subscribe should succeed.
-  MockLocalTrackVisitor local_track_visitor;
-  session_.AddLocalTrack(FullTrackName("foo", "bar"),
-                         MoqtForwardingPreference::kObject,
-                         &local_track_visitor);
+  auto track_publisher =
+      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
+  EXPECT_CALL(*track_publisher, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kStatusNotAvailable));
+  publisher_.Add(track_publisher);
   correct_message = true;
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
@@ -404,50 +417,20 @@
   EXPECT_TRUE(correct_message);
 }
 
-TEST_F(MoqtSessionTest, HasSubscribers) {
-  MockLocalTrackVisitor local_track_visitor;
-  FullTrackName ftn("foo", "bar");
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
-                         &local_track_visitor);
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
-
-  // Peer subscribes.
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/0,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  bool correct_message = true;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
-        return absl::OkStatus();
-      });
-  stream_input->OnSubscribeMessage(request);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(session_.HasSubscribers(ftn));
-}
-
 TEST_F(MoqtSessionTest, SubscribeForPast) {
-  MockLocalTrackVisitor local_track_visitor;
   FullTrackName ftn("foo", "bar");
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &local_track_visitor);
+  auto track = std::make_shared<MockTrackPublisher>(ftn);
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
+  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
+      .WillRepeatedly(Return(std::vector<FullSequence>()));
+  EXPECT_CALL(*track, GetLargestSequence())
+      .WillRepeatedly(Return(FullSequence(10, 20)));
+  publisher_.Add(track);
 
-  // Send Sequence (2, 0) so that next_sequence is set correctly.
-  session_.PublishObject(ftn, 2, 0, 0, MoqtObjectStatus::kNormal, "foo");
   // Peer subscribes to (0, 0)
   MoqtSubscribe request = {
       /*subscribe_id=*/1,
@@ -464,8 +447,6 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
   bool correct_message = true;
-  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_)).WillOnce(Return([] {
-  }));
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
@@ -882,114 +863,141 @@
 }
 
 TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kObject,
+                              FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
-  // No subscription; this is a no-op except to update next_sequence.
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
-  session_.PublishObject(ftn, 4, 1, 0, MoqtObjectStatus::kNormal, "deadbeef");
-  EXPECT_EQ(MoqtSessionPeer::next_sequence(&session_, ftn), FullSequence(4, 2));
-
-  // Publish in window.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
+  bool fin = false;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+    return stream_visitor.get();
+  });
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
-  // Send on the stream
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(&mock_stream));
-  bool correct_message = false;
+      .WillRepeatedly(Return(&mock_stream));
+
   // Verify first six message fields are sent correctly
-  uint8_t kExpectedMessage[] = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+  bool correct_message = false;
+  const std::string kExpectedMessage = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
-        correct_message = (0 == memcmp(data.data()->data(), kExpectedMessage,
-                                       sizeof(kExpectedMessage)));
+        correct_message = absl::StartsWith(data[0], kExpectedMessage);
+        fin |= options.send_fin();
         return absl::OkStatus();
       });
-  session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal, "deadbeef");
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
   EXPECT_TRUE(correct_message);
 }
 
 // TODO: Test operation with multiple streams.
 
-// Error cases
-
-TEST_F(MoqtSessionTest, CannotOpenUniStream) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
+TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  ;
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
+
+  // Queue the outgoing stream.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(false));
-  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal,
-                                      "deadbeef"));
-}
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
 
-TEST_F(MoqtSessionTest, GetStreamByIdFails) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  // Unblock the session, and cause the queued stream to be sent.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
+  bool fin = false;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+    return stream_visitor.get();
+  });
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(nullptr));
-  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal,
-                                      "deadbeef"));
+      .WillRepeatedly(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
 }
 
-TEST_F(MoqtSessionTest, SubscribeProposesBadTrackAlias) {
-  MockLocalTrackVisitor local_track_visitor;
+TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
   FullTrackName ftn("foo", "bar");
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
-                         &local_track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
-  // Peer subscribes.
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/3,  // Doesn't match 2.
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/0,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
+  // Set up an outgoing stream for a group.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true));
   StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  bool correct_message = true;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeError);
-        return absl::OkStatus();
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
       });
-  stream_input->OnSubscribeMessage(request);
-  EXPECT_TRUE(correct_message);
+  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+    return stream_visitor.get();
+  });
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kOutgoingUniStreamId));
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillRepeatedly(Return(&mock_stream));
+
+  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillOnce([] {
+    return std::optional<PublishedObject>();
+  });
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
+
+  // Now that the stream exists and is recorded within subscription, make it
+  // disappear by returning nullptr.
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillRepeatedly(Return(nullptr));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).Times(0);
+  subscription->OnNewObjectAvailable(FullSequence(5, 1));
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -1081,10 +1089,9 @@
 
 TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 1, 3, 4);
-  EXPECT_TRUE(session_.HasSubscribers(ftn));
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kTrack, FullSequence(4, 2));
+  MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
@@ -1103,15 +1110,14 @@
       });
   stream_input->OnUnsubscribeMessage(unsubscribe);
   EXPECT_TRUE(correct_message);
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
 }
 
 TEST_F(MoqtSessionTest, SendDatagram) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kDatagram,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  std::shared_ptr<MockTrackPublisher> track_publisher = SetupPublisher(
+      ftn, MoqtForwardingPreference::kDatagram, FullSequence{4, 0});
+  MoqtObjectListener* listener =
+      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 0, 2, 5, 0);
 
   // Publish in window.
   bool correct_message = false;
@@ -1128,7 +1134,12 @@
         return webtransport::DatagramStatus(
             webtransport::DatagramStatusCode::kSuccess, "");
       });
-  session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal, "deadbeef");
+  EXPECT_CALL(*track_publisher, GetCachedObject(FullSequence{5, 0}))
+      .WillRepeatedly([] {
+        return PublishedObject{FullSequence{5, 0}, MoqtObjectStatus::kNormal,
+                               MemSliceFromString("deadbeef")};
+      });
+  listener->OnNewObjectAvailable(FullSequence(5, 0));
   EXPECT_TRUE(correct_message);
 }
 
@@ -1240,6 +1251,8 @@
   stream_input->OnAnnounceMessage(announce);
 }
 
+// TODO: re-enable this test once this behavior is re-implemented.
+#if 0
 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
   MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
   FullTrackName ftn("foo", "bar");
@@ -1275,110 +1288,7 @@
   EXPECT_TRUE(correct_message);
   EXPECT_FALSE(session_.HasSubscribers(ftn));
 }
-
-TEST_F(MoqtSessionTest, ProcessAnnounceCancelNoSubscribes) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  EXPECT_EQ(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-}
-
-TEST_F(MoqtSessionTest, ProcessAnnounceCancelActiveSubscribes) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 1, 2, 7, 0);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  // The track is still there because there is a subscribe.
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  // Unsubscribe from 0.
-  MoqtUnsubscribe unsubscribe = {
-      /*subscribe_id=*/0,
-  };
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  bool correct_message = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeDone);
-        return absl::OkStatus();
-      });
-  stream_input->OnUnsubscribeMessage(unsubscribe);
-  EXPECT_TRUE(correct_message);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  // Unsubscribe from 1.
-  unsubscribe.subscribe_id = 1;
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  correct_message = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeDone);
-        return absl::OkStatus();
-      });
-  stream_input->OnUnsubscribeMessage(unsubscribe);
-  EXPECT_TRUE(correct_message);
-
-  EXPECT_EQ(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-}
-
-TEST_F(MoqtSessionTest, AnnounceCancelThenSubscribe) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  // The track is still there because there is a subscribe.
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSubscribe subscribe = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/4,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
-  EXPECT_CALL(mock_session_,
-              CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
-                           "Received SUBSCRIBE for canceled track"))
-      .Times(1);
-  stream_input->OnSubscribeMessage(subscribe);
-}
-
-// TODO: Cover more error cases in the above
+#endif
 
 }  // namespace test
 
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 21f16e5..422a3fc 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -4,12 +4,12 @@
 
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 
-#include <cstdint>
 #include <optional>
 #include <vector>
 
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -21,9 +21,9 @@
   return (!end_.has_value() || seq <= *end_);
 }
 
-std::optional<webtransport::StreamId> SubscribeWindow::GetStreamForSequence(
+std::optional<webtransport::StreamId> SendStreamMap::GetStreamForSequence(
     FullSequence sequence) const {
-  FullSequence index = SequenceToIndex(sequence);
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
   auto stream_it = send_streams_.find(index);
   if (stream_it == send_streams_.end()) {
     return std::nullopt;
@@ -31,59 +31,27 @@
   return stream_it->second;
 }
 
-void SubscribeWindow::AddStream(uint64_t group_id, uint64_t object_id,
-                                webtransport::StreamId stream_id) {
-  if (!InWindow(FullSequence(group_id, object_id))) {
-    return;
-  }
-  FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+void SendStreamMap::AddStream(FullSequence sequence,
+                              webtransport::StreamId stream_id) {
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
   if (forwarding_preference_ == MoqtForwardingPreference::kDatagram) {
     QUIC_BUG(quic_bug_moqt_draft_03_01) << "Adding a stream for datagram";
     return;
   }
-  auto stream_it = send_streams_.find(index);
-  if (stream_it != send_streams_.end()) {
-    QUIC_BUG(quic_bug_moqt_draft_03_02) << "Stream already added";
-    return;
-  }
-  send_streams_[index] = stream_id;
+  auto [stream_it, success] = send_streams_.emplace(index, stream_id);
+  QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added";
 }
 
-void SubscribeWindow::RemoveStream(uint64_t group_id, uint64_t object_id) {
-  FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+void SendStreamMap::RemoveStream(FullSequence sequence,
+                                 webtransport::StreamId stream_id) {
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
+  QUICHE_DCHECK(send_streams_.contains(index) &&
+                send_streams_.find(index)->second == stream_id)
+      << "Requested to remove a stream ID that does not match the one in the "
+         "map";
   send_streams_.erase(index);
 }
 
-bool SubscribeWindow::OnObjectSent(FullSequence sequence,
-                                   MoqtObjectStatus status) {
-  if (!largest_delivered_.has_value() || *largest_delivered_ < sequence) {
-    largest_delivered_ = sequence;
-  }
-  // Update next_to_backfill_
-  if (sequence < original_next_object_ && next_to_backfill_.has_value() &&
-      *next_to_backfill_ <= sequence) {
-    switch (status) {
-      case MoqtObjectStatus::kNormal:
-      case MoqtObjectStatus::kObjectDoesNotExist:
-        next_to_backfill_ = sequence.next();
-        break;
-      case MoqtObjectStatus::kEndOfGroup:
-        next_to_backfill_ = FullSequence(sequence.group + 1, 0);
-        break;
-      default:  // Includes kEndOfTrack.
-        next_to_backfill_ = std::nullopt;
-        break;
-    }
-    if (next_to_backfill_ == original_next_object_ ||
-        *next_to_backfill_ == end_) {
-      // Redelivery is complete.
-      next_to_backfill_ = std::nullopt;
-    }
-  }
-  return (!next_to_backfill_.has_value() && end_.has_value() &&
-          *end_ <= sequence);
-}
-
 bool SubscribeWindow::UpdateStartEnd(FullSequence start,
                                      std::optional<FullSequence> end) {
   // Can't make the subscription window bigger.
@@ -98,29 +66,37 @@
   return true;
 }
 
-FullSequence SubscribeWindow::SequenceToIndex(FullSequence sequence) const {
-  switch (forwarding_preference_) {
+bool SubscribeWindow::IsStreamProvokingObject(
+    FullSequence sequence, MoqtForwardingPreference preference) const {
+  if (preference == MoqtForwardingPreference::kGroup) {
+    return sequence.object == 0 || sequence == start_;
+  }
+  QUICHE_DCHECK(preference != MoqtForwardingPreference::kDatagram);
+  return true;
+}
+
+ReducedSequenceIndex::ReducedSequenceIndex(
+    FullSequence sequence, MoqtForwardingPreference preference) {
+  switch (preference) {
     case MoqtForwardingPreference::kTrack:
-      return FullSequence(0, 0);
+      sequence_ = FullSequence(0, 0);
+      break;
     case MoqtForwardingPreference::kGroup:
-      return FullSequence(sequence.group, 0);
+      sequence_ = FullSequence(sequence.group, 0);
+      break;
     case MoqtForwardingPreference::kObject:
-      return sequence;
     case MoqtForwardingPreference::kDatagram:
-      QUIC_BUG(quic_bug_moqt_draft_03_01) << "No stream for datagram";
-      return FullSequence(0, 0);
+      sequence_ = sequence;
+      break;
   }
 }
 
-std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
-    FullSequence sequence) {
-  std::vector<SubscribeWindow*> retval;
-  for (auto& [subscribe_id, window] : windows_) {
-    if (window.InWindow(sequence)) {
-      retval.push_back(&(window));
-    }
+std::vector<webtransport::StreamId> SendStreamMap::GetAllStreams() const {
+  std::vector<webtransport::StreamId> ids;
+  for (const auto& [index, id] : send_streams_) {
+    ids.push_back(id);
   }
-  return retval;
+  return ids;
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index d393284..5e0307d 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -10,147 +10,88 @@
 #include <vector>
 
 #include "absl/container/flat_hash_map.h"
-#include "absl/container/node_hash_map.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
-// Classes to track subscriptions to local tracks: the sequence numbers
-// subscribed, the streams involved, and the subscribe IDs.
+// SubscribeWindow represents a window of objects for which an MoQT subscription
+// can be valid.
 class QUICHE_EXPORT SubscribeWindow {
  public:
   // Creates a half-open window. |next_object| is the expected sequence number
   // of the next published object on the track.
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, uint64_t start_group,
-                  uint64_t start_object)
-      : SubscribeWindow(subscribe_id, forwarding_preference, next_object,
-                        FullSequence(start_group, start_object), std::nullopt) {
+  SubscribeWindow(uint64_t start_group, uint64_t start_object)
+      : SubscribeWindow(FullSequence(start_group, start_object), std::nullopt) {
   }
 
   // Creates a closed window.
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, uint64_t start_group,
-                  uint64_t start_object, uint64_t end_group,
-                  uint64_t end_object)
-      : SubscribeWindow(subscribe_id, forwarding_preference, next_object,
-                        FullSequence(start_group, start_object),
+  SubscribeWindow(uint64_t start_group, uint64_t start_object,
+                  uint64_t end_group, uint64_t end_object)
+      : SubscribeWindow(FullSequence(start_group, start_object),
                         FullSequence(end_group, end_object)) {}
 
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, FullSequence start,
-                  std::optional<FullSequence> end)
-      : subscribe_id_(subscribe_id),
-        start_(start),
-        end_(end),
-        original_next_object_(next_object),
-        forwarding_preference_(forwarding_preference) {
-    next_to_backfill_ =
-        (start < next_object) ? start : std::optional<FullSequence>();
-  }
-
-  uint64_t subscribe_id() const { return subscribe_id_; }
+  SubscribeWindow(FullSequence start, std::optional<FullSequence> end)
+      : start_(start), end_(end) {}
 
   bool InWindow(const FullSequence& seq) const;
-
-  // Returns the stream to send |sequence| on, if already opened.
-  std::optional<webtransport::StreamId> GetStreamForSequence(
-      FullSequence sequence) const;
-
-  // Records what stream is being used for a track, group, or object depending
-  // on |forwarding_preference|. Triggers QUIC_BUG if already assigned.
-  void AddStream(uint64_t group_id, uint64_t object_id,
-                 webtransport::StreamId stream_id);
-
-  void RemoveStream(uint64_t group_id, uint64_t object_id);
-
   bool HasEnd() const { return end_.has_value(); }
-  MoqtForwardingPreference forwarding_preference() const {
-    return forwarding_preference_;
-  }
+  FullSequence start() const { return start_; }
 
-  // Returns true if the object delivery completed the subscription
-  bool OnObjectSent(FullSequence sequence, MoqtObjectStatus status);
-
-  std::optional<FullSequence>& largest_delivered() {
-    return largest_delivered_;
-  }
-
-  // Returns true if the updated values are valid.
+  // Updates the subscription window. Returns true if the update is valid (in
+  // MoQT, subscription windows are only allowed to shrink, not to expand).
   bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end);
 
- private:
-  // Converts an object sequence number into one that matches the way that
-  // stream IDs are being mapped. (See the comment for send_streams_ below.)
-  FullSequence SequenceToIndex(FullSequence sequence) const;
+  // Returns true if for a given forwarding preference, the specified sequence
+  // number might be the first object on a stream.
+  bool IsStreamProvokingObject(FullSequence sequence,
+                               MoqtForwardingPreference preference) const;
 
-  const uint64_t subscribe_id_;
+ private:
   FullSequence start_;
   std::optional<FullSequence> end_;
-  std::optional<FullSequence> largest_delivered_;
-  // The next sequence number to be redelivered, because it was published prior
-  // to the subscription. Is nullopt if no redeliveries are needed.
-  std::optional<FullSequence> next_to_backfill_;
-  // The first unpublished sequence number when the subscribe arrived.
-  const FullSequence original_next_object_;
-  // Store open streams for this subscription. If the forwarding preference is
-  // kTrack, there is one entry under sequence (0, 0). If kGroup, each entry is
-  // under (group, 0). If kObject, it's tracked under the full sequence. If
-  // kDatagram, the map is empty.
-  absl::flat_hash_map<FullSequence, webtransport::StreamId> send_streams_;
-  // The forwarding preference for this track; informs how the streams are
-  // mapped.
-  const MoqtForwardingPreference forwarding_preference_;
 };
 
-// Class to keep track of the sequence number blocks to which a peer is
-// subscribed.
-class QUICHE_EXPORT MoqtSubscribeWindows {
+// ReducedSequenceIndex represents an index object such that if two sequence
+// numbers are mapped to the same stream, they will be mapped to the same index.
+class ReducedSequenceIndex {
  public:
-  MoqtSubscribeWindows(MoqtForwardingPreference forwarding_preference)
-      : forwarding_preference_(forwarding_preference) {}
+  ReducedSequenceIndex(FullSequence sequence,
+                       MoqtForwardingPreference preference);
 
-  // Returns a vector of subscribe IDs that apply to the object. They will be in
-  // reverse order of the AddWindow calls.
-  std::vector<SubscribeWindow*> SequenceIsSubscribed(FullSequence sequence);
-
-  // |start_group| and |start_object| must be absolute sequence numbers. An
-  // optimization could consolidate overlapping subscribe windows.
-  void AddWindow(uint64_t subscribe_id, FullSequence next_object,
-                 uint64_t start_group, uint64_t start_object) {
-    windows_.emplace(subscribe_id,
-                     SubscribeWindow(subscribe_id, forwarding_preference_,
-                                     next_object, start_group, start_object));
+  bool operator==(const ReducedSequenceIndex& other) const {
+    return sequence_ == other.sequence_;
   }
-  void AddWindow(uint64_t subscribe_id, FullSequence next_object,
-                 uint64_t start_group, uint64_t start_object,
-                 uint64_t end_group, uint64_t end_object) {
-    windows_.emplace(
-        subscribe_id,
-        SubscribeWindow(subscribe_id, forwarding_preference_, next_object,
-                        start_group, start_object, end_group, end_object));
+  bool operator!=(const ReducedSequenceIndex& other) const {
+    return sequence_ != other.sequence_;
   }
-  void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
 
-  bool IsEmpty() const { return windows_.empty(); }
-
-  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
-    auto it = windows_.find(subscribe_id);
-    if (it == windows_.end()) {
-      return nullptr;
-    }
-    return &it->second;
+  template <typename H>
+  friend H AbslHashValue(H h, const ReducedSequenceIndex& m) {
+    return H::combine(std::move(h), m.sequence_);
   }
 
  private:
-  // Indexed by Subscribe ID.
-  absl::node_hash_map<uint64_t, SubscribeWindow> windows_;
-  const MoqtForwardingPreference forwarding_preference_;
+  FullSequence sequence_;
+};
+
+// A map of outgoing data streams indexed by object sequence numbers.
+class QUICHE_EXPORT SendStreamMap {
+ public:
+  explicit SendStreamMap(MoqtForwardingPreference forwarding_preference)
+      : forwarding_preference_(forwarding_preference) {}
+
+  std::optional<webtransport::StreamId> GetStreamForSequence(
+      FullSequence sequence) const;
+  void AddStream(FullSequence sequence, webtransport::StreamId stream_id);
+  void RemoveStream(FullSequence sequence, webtransport::StreamId stream_id);
+  std::vector<webtransport::StreamId> GetAllStreams() const;
+
+ private:
+  absl::flat_hash_map<ReducedSequenceIndex, webtransport::StreamId>
+      send_streams_;
+  MoqtForwardingPreference forwarding_preference_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 0aae7c7..49213ff 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -21,15 +21,12 @@
   SubscribeWindowTest() {}
 
   const uint64_t subscribe_id_ = 2;
-  const FullSequence right_edge_{4, 5};
   const FullSequence start_{4, 0};
   const FullSequence end_{5, 5};
 };
 
 TEST_F(SubscribeWindowTest, Queries) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  EXPECT_EQ(window.subscribe_id(), 2);
+  SubscribeWindow window(start_, end_);
   EXPECT_TRUE(window.InWindow(FullSequence(4, 0)));
   EXPECT_TRUE(window.InWindow(FullSequence(5, 5)));
   EXPECT_FALSE(window.InWindow(FullSequence(5, 6)));
@@ -38,98 +35,51 @@
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdTrack) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kTrack,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  EXPECT_QUIC_BUG(window.AddStream(5, 2, 6), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 2)), 2);
-  window.RemoveStream(7, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 0)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kTrack);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{5, 2}, 6),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), 2);
+  stream_map.RemoveStream(FullSequence{7, 2}, 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kGroup,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
-  window.AddStream(5, 2, 6);
-  EXPECT_QUIC_BUG(window.AddStream(5, 3, 6), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 1)), 2);
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 0)), 6);
-  window.RemoveStream(5, 1);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 2)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kGroup);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
+  stream_map.AddStream(FullSequence{5, 2}, 6);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{5, 3}, 6),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 1)), 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), 6);
+  stream_map.RemoveStream(FullSequence{5, 1}, 6);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  window.AddStream(4, 1, 6);
-  window.AddStream(4, 2, 10);
-  EXPECT_QUIC_BUG(window.AddStream(4, 2, 14), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 0)), 2);
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 2)), 10);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 4)).has_value());
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
-  window.RemoveStream(4, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 2)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kObject);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  stream_map.AddStream(FullSequence{4, 1}, 6);
+  stream_map.AddStream(FullSequence{4, 2}, 10);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 2}, 14),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), 10);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 4)), std::nullopt);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
+  stream_map.RemoveStream(FullSequence(4, 2), 10);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kDatagram,
-                         right_edge_, start_, end_);
-  EXPECT_QUIC_BUG(window.AddStream(4, 0, 2), "Adding a stream for datagram");
-}
-
-TEST_F(SubscribeWindowTest, OnObjectSent) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  EXPECT_FALSE(window.largest_delivered().has_value());
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 1), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(window.largest_delivered().has_value());
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 2), MoqtObjectStatus::kNormal));
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 0), MoqtObjectStatus::kNormal));
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2));
-}
-
-TEST_F(SubscribeWindowTest, AllObjectsUnpublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(0, 0), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
-}
-
-TEST_F(SubscribeWindowTest, AllObjectsPublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(4, 0), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
-}
-
-TEST_F(SubscribeWindowTest, SomeObjectsUnpublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(0, 1), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
+  SendStreamMap stream_map(MoqtForwardingPreference::kDatagram);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 0}, 2),
+                  "Adding a stream for datagram");
 }
 
 TEST_F(SubscribeWindowTest, UpdateStartEnd) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
+  SubscribeWindow window(start_, end_);
   EXPECT_TRUE(window.UpdateStartEnd(start_.next(),
                                     FullSequence(end_.group, end_.object - 1)));
   EXPECT_FALSE(window.InWindow(FullSequence(start_.group, start_.object)));
@@ -140,55 +90,12 @@
 }
 
 TEST_F(SubscribeWindowTest, UpdateStartEndOpenEnded) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, std::nullopt);
+  SubscribeWindow window(start_, std::nullopt);
   EXPECT_TRUE(window.UpdateStartEnd(start_, end_));
   EXPECT_FALSE(window.InWindow(end_.next()));
   EXPECT_FALSE(window.UpdateStartEnd(start_, std::nullopt));
 }
 
-class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest {
- public:
-  MoqtSubscribeWindowsTest() : windows_(MoqtForwardingPreference::kObject) {}
-  MoqtSubscribeWindows windows_;
-};
-
-TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
-  EXPECT_TRUE(windows_.IsEmpty());
-  windows_.AddWindow(0, FullSequence(2, 1), 1, 3);
-  EXPECT_FALSE(windows_.IsEmpty());
-}
-
-TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
-  EXPECT_TRUE(windows_.IsEmpty());
-  // The first two windows overlap; the third is open-ended.
-  windows_.AddWindow(0, FullSequence(0, 0), 1, 0, 3, 9);
-  windows_.AddWindow(1, FullSequence(0, 0), 2, 4, 4, 3);
-  windows_.AddWindow(2, FullSequence(0, 0), 10, 0);
-  EXPECT_FALSE(windows_.IsEmpty());
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty());
-  auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0));
-  EXPECT_EQ(hits.size(), 1);
-  EXPECT_EQ(hits[0]->subscribe_id(), 0);
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(4, 4)).empty());
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(8, 3)).empty());
-  hits = windows_.SequenceIsSubscribed(FullSequence(100, 7));
-  EXPECT_EQ(hits.size(), 1);
-  EXPECT_EQ(hits[0]->subscribe_id(), 2);
-  hits = windows_.SequenceIsSubscribed(FullSequence(3, 0));
-  EXPECT_EQ(hits.size(), 2);
-  EXPECT_EQ(hits[0]->subscribe_id() + hits[1]->subscribe_id(), 1);
-}
-
-TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) {
-  windows_.AddWindow(0, FullSequence(2, 5), 1, 0, 3, 9);
-  SubscribeWindow* window = windows_.GetWindow(0);
-  EXPECT_EQ(window->subscribe_id(), 0);
-  EXPECT_EQ(windows_.GetWindow(1), nullptr);
-  windows_.RemoveWindow(0);
-  EXPECT_EQ(windows_.GetWindow(0), nullptr);
-}
-
 }  // namespace test
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index d2644c8..ee85664 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -10,66 +10,6 @@
 
 namespace moqt {
 
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object);
-}
-
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object, uint64_t end_group) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  // The end object might be unknown.
-  auto it = max_object_ids_.find(end_group);
-  if (end_group >= next_sequence_.group) {
-    // Group is not fully published yet, so end object is unknown.
-    windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                       end_group, UINT64_MAX);
-    return;
-  }
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                     end_group, it->second);
-}
-
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object, uint64_t end_group,
-                           uint64_t end_object) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                     end_group, end_object);
-}
-
-void LocalTrack::SentSequence(FullSequence sequence, MoqtObjectStatus status) {
-  QUICHE_DCHECK(max_object_ids_.find(sequence.group) == max_object_ids_.end() ||
-                max_object_ids_[sequence.group] < sequence.object);
-  switch (status) {
-    case MoqtObjectStatus::kNormal:
-    case MoqtObjectStatus::kObjectDoesNotExist:
-      if (next_sequence_ <= sequence) {
-        next_sequence_ = sequence.next();
-      }
-      break;
-    case MoqtObjectStatus::kGroupDoesNotExist:
-      max_object_ids_[sequence.group] = 0;
-      break;
-    case MoqtObjectStatus::kEndOfGroup:
-      max_object_ids_[sequence.group] = sequence.object;
-      if (next_sequence_ <= sequence) {
-        next_sequence_ = FullSequence(sequence.group + 1, 0);
-      }
-      break;
-    case MoqtObjectStatus::kEndOfTrack:
-      max_object_ids_[sequence.group] = sequence.object;
-      break;
-    default:
-      QUICHE_DCHECK(false);
-      return;
-  }
-}
-
 bool RemoteTrack::CheckForwardingPreference(
     MoqtForwardingPreference preference) {
   if (forwarding_preference_.has_value()) {
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index c9ece3f..3fefee5 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -7,127 +7,12 @@
 
 #include <cstdint>
 #include <optional>
-#include <vector>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/platform/api/quic_bug_tracker.h"
-#include "quiche/common/quiche_callbacks.h"
 
 namespace moqt {
 
-// A track to which the peer might subscribe.
-class LocalTrack {
- public:
-  class Visitor {
-   public:
-    virtual ~Visitor() = default;
-
-    using PublishPastObjectsCallback = quiche::SingleUseCallback<void()>;
-
-    // Requests that application re-publish objects from {start_group,
-    // start_object} to the latest object. If the return value is nullopt, the
-    // subscribe is valid and the application will deliver the object and
-    // the session will send SUBSCRIBE_OK. If the return has a value, the value
-    // is the error message (the session will send SUBSCRIBE_ERROR). Via this
-    // API, the application decides if a partially fulfillable
-    // SUBSCRIBE results in an error or not.
-    virtual absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
-        const SubscribeWindow& window) = 0;
-  };
-  // |visitor| must not be nullptr.
-  LocalTrack(const FullTrackName& full_track_name,
-             MoqtForwardingPreference forwarding_preference, Visitor* visitor)
-      : full_track_name_(full_track_name),
-        forwarding_preference_(forwarding_preference),
-        windows_(forwarding_preference),
-        visitor_(visitor) {}
-  // Creates a LocalTrack that does not start at sequence (0,0)
-  LocalTrack(const FullTrackName& full_track_name,
-             MoqtForwardingPreference forwarding_preference, Visitor* visitor,
-             FullSequence next_sequence)
-      : full_track_name_(full_track_name),
-        forwarding_preference_(forwarding_preference),
-        windows_(forwarding_preference),
-        next_sequence_(next_sequence),
-        visitor_(visitor) {}
-
-  const FullTrackName& full_track_name() const { return full_track_name_; }
-
-  std::optional<uint64_t> track_alias() const { return track_alias_; }
-  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
-
-  Visitor* visitor() { return visitor_; }
-
-  // Returns the subscribe windows that want the object defined by (|group|,
-  // |object|).
-  std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) {
-    return windows_.SequenceIsSubscribed(sequence);
-  }
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object);
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object, uint64_t end_group);
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object, uint64_t end_group,
-                 uint64_t end_object);
-
-  void DeleteWindow(uint64_t subscribe_id) {
-    windows_.RemoveWindow(subscribe_id);
-  }
-
-  // Returns the largest observed sequence, but increments the object sequence
-  // by one.
-  const FullSequence& next_sequence() const { return next_sequence_; }
-
-  // Updates next_sequence_ if |sequence| is larger. Updates max_object_ids_
-  // if relevant.
-  void SentSequence(FullSequence sequence, MoqtObjectStatus status);
-
-  bool HasSubscriber() const { return !windows_.IsEmpty(); }
-
-  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
-    return windows_.GetWindow(subscribe_id);
-  }
-
-  MoqtForwardingPreference forwarding_preference() const {
-    return forwarding_preference_;
-  }
-
-  void set_announce_cancel() { announce_canceled_ = true; }
-  bool canceled() const { return announce_canceled_; }
-
- private:
-  // This only needs to track subscriptions to current and future objects;
-  // requests for objects in the past are forwarded to the application.
-  const FullTrackName full_track_name_;
-  // The forwarding preference for the track.
-  MoqtForwardingPreference forwarding_preference_;
-  // Let the first SUBSCRIBE determine the track alias.
-  std::optional<uint64_t> track_alias_;
-  // The sequence numbers from this track to which the peer is subscribed.
-  MoqtSubscribeWindows windows_;
-  // By recording the highest observed sequence number, MoQT can interpret
-  // relative sequence numbers in SUBSCRIBEs.
-  FullSequence next_sequence_ = {0, 0};
-  // The object ID of each EndOfGroup object received, indexed by group ID.
-  // Entry does not exist, if no kGroupDoesNotExist, EndOfGroup, or
-  // EndOfTrack has been received for that group.
-  absl::flat_hash_map<uint64_t, uint64_t> max_object_ids_;
-  Visitor* visitor_;
-
-  // If true, the session has received ANNOUNCE_CANCELED for this namespace.
-  // Additional subscribes will be a protocol error, and the track can be
-  // destroyed once all active subscribes end.
-  bool announce_canceled_ = false;
-};
-
 // A track on the peer to which the session has subscribed.
 class RemoteTrack {
  public:
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index fc0a867..4b0ff92 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -4,11 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
-#include <optional>
-
-#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
 
@@ -16,84 +11,6 @@
 
 namespace test {
 
-class LocalTrackTest : public quic::test::QuicTest {
- public:
-  LocalTrackTest()
-      : track_(FullTrackName("foo", "bar"), MoqtForwardingPreference::kTrack,
-               &visitor_, FullSequence(4, 1)) {}
-  LocalTrack track_;
-  MockLocalTrackVisitor visitor_;
-};
-
-TEST_F(LocalTrackTest, Queries) {
-  EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
-  EXPECT_EQ(track_.visitor(), &visitor_);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));
-  track_.SentSequence(FullSequence(4, 0), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));  // no change
-  track_.SentSequence(FullSequence(4, 1), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 2));
-  track_.SentSequence(FullSequence(4, 2), MoqtObjectStatus::kEndOfGroup);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(5, 0));
-  EXPECT_FALSE(track_.HasSubscriber());
-  EXPECT_EQ(track_.forwarding_preference(), MoqtForwardingPreference::kTrack);
-}
-
-TEST_F(LocalTrackTest, SetTrackAlias) {
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
-  track_.set_track_alias(6);
-  EXPECT_EQ(track_.track_alias(), 6);
-}
-
-TEST_F(LocalTrackTest, AddGetDeleteWindow) {
-  track_.AddWindow(0, 4, 1);
-  EXPECT_EQ(track_.GetWindow(0)->subscribe_id(), 0);
-  EXPECT_EQ(track_.GetWindow(1), nullptr);
-  track_.DeleteWindow(0);
-  EXPECT_EQ(track_.GetWindow(0), nullptr);
-}
-
-TEST_F(LocalTrackTest, GroupSubscriptionUsesMaxObjectId) {
-  // Populate max_object_ids_
-  track_.SentSequence(FullSequence(0, 0), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(1, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(1, 1), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(2, 0), MoqtObjectStatus::kGroupDoesNotExist);
-  track_.SentSequence(FullSequence(3, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 1), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 2), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 3), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(4, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 1), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 2), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 3), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 4), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 5));
-  track_.AddWindow(0, 1, 1, 3);
-  SubscribeWindow* window = track_.GetWindow(0);
-  EXPECT_TRUE(window->InWindow(FullSequence(3, 3)));
-  EXPECT_FALSE(window->InWindow(FullSequence(3, 4)));
-  // End on an empty group.
-  track_.AddWindow(1, 1, 1, 2);
-  window = track_.GetWindow(1);
-  EXPECT_TRUE(window->InWindow(FullSequence(1, 1)));
-  // End on an group in progress.
-  track_.AddWindow(2, 1, 1, 4);
-  window = track_.GetWindow(2);
-  EXPECT_TRUE(window->InWindow(FullSequence(4, 9)));
-  EXPECT_FALSE(window->InWindow(FullSequence(5, 0)));
-}
-
-TEST_F(LocalTrackTest, ShouldSend) {
-  track_.AddWindow(0, 4, 1);
-  EXPECT_TRUE(track_.HasSubscriber());
-  EXPECT_TRUE(track_.ShouldSend(FullSequence(3, 12)).empty());
-  EXPECT_TRUE(track_.ShouldSend(FullSequence(4, 0)).empty());
-  EXPECT_EQ(track_.ShouldSend(FullSequence(4, 1)).size(), 1);
-  EXPECT_EQ(track_.ShouldSend(FullSequence(12, 0)).size(), 1);
-}
-
 class RemoteTrackTest : public quic::test::QuicTest {
  public:
   RemoteTrackTest()
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index acdbf0c..a344a7d 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -26,6 +26,7 @@
 #include "quiche/quic/core/quic_server_id.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moqt_client.h"
@@ -37,6 +38,9 @@
 #include "quiche/quic/tools/quic_url.h"
 #include "quiche/common/platform/api/quiche_command_line_flags.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
 
 DEFINE_QUICHE_COMMAND_LINE_FLAG(
     bool, disable_certificate_verification, false,
@@ -105,20 +109,15 @@
       session_is_open_ = false;
       return;
     }
-    session_->PublishObject(my_track_name_, next_sequence_.group,
-                            next_sequence_.object++, /*object_send_order=*/0,
-                            moqt::MoqtObjectStatus::kNormal, input_message);
-    session_->PublishObject(my_track_name_, next_sequence_.group,
-                            next_sequence_.object, /*object_send_order=*/0,
-                            moqt::MoqtObjectStatus::kEndOfGroup, "");
-    ++next_sequence_.group;
-    next_sequence_.object = 0;
+    quiche::QuicheMemSlice message_slice(quiche::QuicheBuffer::Copy(
+        quiche::SimpleBufferAllocator::Get(), input_message));
+    queue_->AddObject(std::move(message_slice), /*key=*/true);
   }
 
   bool session_is_open() const { return session_is_open_; }
   bool is_syncing() const {
     return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
-           !session_->HasSubscribers(my_track_name_);
+           (queue_ != nullptr && queue_->HasSubscribers());
   }
 
   void RunEventLoop() {
@@ -202,10 +201,9 @@
       std::cout << "Failed to connect.\n";
       return false;
     }
-    // By not sending a visitor, the application will not fulfill subscriptions
-    // to previous objects.
-    session_->AddLocalTrack(my_track_name_,
-                            moqt::MoqtForwardingPreference::kObject, nullptr);
+    queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
+        client_->session(), my_track_name_,
+        moqt::MoqtForwardingPreference::kObject);
     moqt::MoqtOutgoingAnnounceCallback announce_callback =
         [&](absl::string_view track_namespace,
             std::optional<moqt::MoqtAnnounceErrorReason> reason) {
@@ -354,7 +352,7 @@
   std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
 
   // Handling incoming and outgoing messages
-  moqt::FullSequence next_sequence_ = {0, 0};
+  std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
 
   // Used when chat output goes to file.
   std::ofstream output_file_;
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 0ea75fa..3dc5741 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -7,14 +7,16 @@
 
 #include <cstdint>
 #include <optional>
+#include <utility>
+#include <vector>
 
 #include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/common/platform/api/quiche_test.h"
 
 namespace moqt::test {
 
@@ -39,10 +41,28 @@
   }
 };
 
-class MockLocalTrackVisitor : public LocalTrack::Visitor {
+class MockTrackPublisher : public MoqtTrackPublisher {
  public:
-  MOCK_METHOD(absl::StatusOr<PublishPastObjectsCallback>, OnSubscribeForPast,
-              (const SubscribeWindow& window), (override));
+  explicit MockTrackPublisher(FullTrackName name)
+      : track_name_(std::move(name)) {}
+  const FullTrackName& GetTrackName() const override { return track_name_; }
+
+  MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
+              (FullSequence sequence), (const, override));
+  MOCK_METHOD(std::vector<FullSequence>, GetCachedObjectsInRange,
+              (FullSequence start, FullSequence end), (const, override));
+  MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
+              (override));
+  MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
+              (override));
+  MOCK_METHOD(absl::StatusOr<MoqtTrackStatusCode>, GetTrackStatus, (),
+              (const, override));
+  MOCK_METHOD(FullSequence, GetLargestSequence, (), (const, override));
+  MOCK_METHOD(MoqtForwardingPreference, GetForwardingPreference, (),
+              (const, override));
+
+ private:
+  FullTrackName track_name_;
 };
 
 class MockRemoteTrackVisitor : public RemoteTrack::Visitor {
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index d379afc..6aa5d01 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -9,6 +9,7 @@
 #include <cstdint>
 #include <cstring>
 #include <iostream>
+#include <memory>
 #include <optional>
 #include <string>
 #include <utility>
@@ -22,6 +23,7 @@
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
@@ -95,7 +97,8 @@
                   int keyframe_interval, int fps, float i_to_p_ratio,
                   QuicBandwidth bitrate)
       : Actor(simulator, actor_name),
-        queue_(session, track_name),
+        queue_(std::make_shared<MoqtOutgoingQueue>(
+            session, track_name, MoqtForwardingPreference::kGroup)),
         keyframe_interval_(keyframe_interval),
         time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)) {
     int p_frame_count = keyframe_interval - 1;
@@ -123,18 +126,18 @@
     bool success = writer.WriteUInt64(clock_->Now().ToDebuggingValue());
     QUICHE_CHECK(success);
 
-    queue_.AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
+    queue_->AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
     Schedule(clock_->Now() + time_between_frames_);
   }
 
   void Start() { Schedule(clock_->Now()); }
   void Stop() { Unschedule(); }
 
-  MoqtOutgoingQueue& queue() { return queue_; }
+  std::shared_ptr<MoqtOutgoingQueue> queue() { return queue_; }
   size_t total_objects_sent() const { return frame_number_ + 1; }
 
  private:
-  MoqtOutgoingQueue queue_;
+  std::shared_ptr<MoqtOutgoingQueue> queue_;
   int keyframe_interval_;
   QuicTimeDelta time_between_frames_;
   QuicByteCount i_frame_size_;
@@ -270,8 +273,8 @@
     //   (2) The client starts immediately generating data.  At this point, the
     //       server does not yet have an active subscription, so the client has
     //       some catching up to do.
-    client_session()->AddLocalTrack(
-        TrackName(), MoqtForwardingPreference::kGroup, &generator_.queue());
+    client_session()->set_publisher(&publisher_);
+    publisher_.Add(generator_.queue());
     generator_.Start();
     server_session()->SubscribeCurrentGroup(TrackName().track_namespace,
                                             TrackName().track_name, &receiver_);
@@ -298,6 +301,7 @@
   quic::simulator::Switch switch_;
   quic::simulator::SymmetricLink client_link_;
   quic::simulator::SymmetricLink server_link_;
+  MoqtKnownTrackPublisher publisher_;
   ObjectGenerator generator_;
   ObjectReceiver receiver_;
   SimulationParameters parameters_;