diff --git a/build/source_list.bzl b/build/source_list.bzl
index 34b592f..386d907 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1508,11 +1508,14 @@
     "quic/load_balancer/load_balancer_server_id_test.cc",
 ]
 moqt_hdrs = [
+    "quic/moqt/moqt_cached_object.h",
     "quic/moqt/moqt_framer.h",
+    "quic/moqt/moqt_known_track_publisher.h",
     "quic/moqt/moqt_messages.h",
     "quic/moqt/moqt_outgoing_queue.h",
     "quic/moqt/moqt_parser.h",
     "quic/moqt/moqt_priority.h",
+    "quic/moqt/moqt_publisher.h",
     "quic/moqt/moqt_session.h",
     "quic/moqt/moqt_subscribe_windows.h",
     "quic/moqt/moqt_track.h",
@@ -1523,9 +1526,11 @@
     "quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
+    "quic/moqt/moqt_cached_object.cc",
     "quic/moqt/moqt_framer.cc",
     "quic/moqt/moqt_framer_test.cc",
     "quic/moqt/moqt_integration_test.cc",
+    "quic/moqt/moqt_known_track_publisher.cc",
     "quic/moqt/moqt_messages.cc",
     "quic/moqt/moqt_outgoing_queue.cc",
     "quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 23437ab..2004269 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1512,11 +1512,14 @@
     "src/quiche/quic/load_balancer/load_balancer_server_id_test.cc",
 ]
 moqt_hdrs = [
+    "src/quiche/quic/moqt/moqt_cached_object.h",
     "src/quiche/quic/moqt/moqt_framer.h",
+    "src/quiche/quic/moqt/moqt_known_track_publisher.h",
     "src/quiche/quic/moqt/moqt_messages.h",
     "src/quiche/quic/moqt/moqt_outgoing_queue.h",
     "src/quiche/quic/moqt/moqt_parser.h",
     "src/quiche/quic/moqt/moqt_priority.h",
+    "src/quiche/quic/moqt/moqt_publisher.h",
     "src/quiche/quic/moqt/moqt_session.h",
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
     "src/quiche/quic/moqt/moqt_track.h",
@@ -1527,9 +1530,11 @@
     "src/quiche/quic/moqt/tools/moqt_server.h",
 ]
 moqt_srcs = [
+    "src/quiche/quic/moqt/moqt_cached_object.cc",
     "src/quiche/quic/moqt/moqt_framer.cc",
     "src/quiche/quic/moqt/moqt_framer_test.cc",
     "src/quiche/quic/moqt/moqt_integration_test.cc",
+    "src/quiche/quic/moqt/moqt_known_track_publisher.cc",
     "src/quiche/quic/moqt/moqt_messages.cc",
     "src/quiche/quic/moqt/moqt_outgoing_queue.cc",
     "src/quiche/quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 1b9a333..7e6cd21 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1511,11 +1511,14 @@
     "quiche/quic/load_balancer/load_balancer_server_id_test.cc"
   ],
   "moqt_hdrs": [
+    "quiche/quic/moqt/moqt_cached_object.h",
     "quiche/quic/moqt/moqt_framer.h",
+    "quiche/quic/moqt/moqt_known_track_publisher.h",
     "quiche/quic/moqt/moqt_messages.h",
     "quiche/quic/moqt/moqt_outgoing_queue.h",
     "quiche/quic/moqt/moqt_parser.h",
     "quiche/quic/moqt/moqt_priority.h",
+    "quiche/quic/moqt/moqt_publisher.h",
     "quiche/quic/moqt/moqt_session.h",
     "quiche/quic/moqt/moqt_subscribe_windows.h",
     "quiche/quic/moqt/moqt_track.h",
@@ -1526,9 +1529,11 @@
     "quiche/quic/moqt/tools/moqt_server.h"
   ],
   "moqt_srcs": [
+    "quiche/quic/moqt/moqt_cached_object.cc",
     "quiche/quic/moqt/moqt_framer.cc",
     "quiche/quic/moqt/moqt_framer_test.cc",
     "quiche/quic/moqt/moqt_integration_test.cc",
+    "quiche/quic/moqt/moqt_known_track_publisher.cc",
     "quiche/quic/moqt/moqt_messages.cc",
     "quiche/quic/moqt/moqt_outgoing_queue.cc",
     "quiche/quic/moqt/moqt_outgoing_queue_test.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_cached_object.cc
new file mode 100644
index 0000000..395875b
--- /dev/null
+++ b/quiche/quic/moqt/moqt_cached_object.cc
@@ -0,0 +1,25 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_cached_object.h"
+
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+moqt::PublishedObject CachedObjectToPublishedObject(
+    const CachedObject& object) {
+  PublishedObject result;
+  result.sequence = object.sequence;
+  result.status = object.status;
+  if (object.payload != nullptr && !object.payload->empty()) {
+    result.payload = quiche::QuicheMemSlice(
+        object.payload->data(), object.payload->length(),
+        [retained_pointer = object.payload](const char*) {});
+  }
+  return result;
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h
new file mode 100644
index 0000000..c04b31e
--- /dev/null
+++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -0,0 +1,29 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
+#define QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
+
+#include <memory>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// CachedObject is a version of PublishedObject with a reference counted
+// payload.
+struct CachedObject {
+  FullSequence sequence;
+  MoqtObjectStatus status;
+  std::shared_ptr<quiche::QuicheMemSlice> payload;
+};
+
+// Transforms a CachedObject into a PublishedObject.
+PublishedObject CachedObjectToPublishedObject(const CachedObject& object);
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_CACHED_OBJECT_H_
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 14b7dc2..66df451 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -7,8 +7,10 @@
 #include <optional>
 #include <string>
 
+#include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_generic_session.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
@@ -23,7 +25,6 @@
 
 namespace {
 
-using ::quic::simulator::Simulator;
 using ::quic::test::MemSliceFromString;
 using ::testing::_;
 using ::testing::Assign;
@@ -176,10 +177,13 @@
         return std::optional<MoqtAnnounceErrorReason>();
       });
 
-  MoqtOutgoingQueue queue(client_->session(), FullTrackName{"test", "data"});
-  client_->session()->AddLocalTrack(FullTrackName{"test", "data"},
-                                    MoqtForwardingPreference::kGroup, &queue);
-  queue.AddObject(MemSliceFromString("object data"), /*key=*/true);
+  auto queue = std::make_shared<MoqtOutgoingQueue>(
+      client_->session(), FullTrackName{"test", "data"},
+      MoqtForwardingPreference::kGroup);
+  MoqtKnownTrackPublisher known_track_publisher;
+  known_track_publisher.Add(queue);
+  client_->session()->set_publisher(&known_track_publisher);
+  queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
   bool received_subscribe_ok = false;
   EXPECT_CALL(server_visitor, OnReply(_, _)).WillOnce([&]() {
     received_subscribe_ok = true;
@@ -210,6 +214,119 @@
   EXPECT_TRUE(success);
 }
 
+TEST_F(MoqtIntegrationTest, SendMultipleGroups) {
+  EstablishSession();
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+
+  for (MoqtForwardingPreference forwarding_preference :
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
+        MoqtForwardingPreference::kObject,
+        MoqtForwardingPreference::kDatagram}) {
+    SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
+    MockRemoteTrackVisitor client_visitor;
+    std::string name =
+        absl::StrCat("pref_", static_cast<int>(forwarding_preference));
+    auto queue = std::make_shared<MoqtOutgoingQueue>(
+        client_->session(), FullTrackName{"test", name},
+        MoqtForwardingPreference::kObject);
+    publisher.Add(queue);
+    queue->AddObject(MemSliceFromString("object 1"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 2"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 3"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
+
+    client_->session()->SubscribeCurrentGroup("test", name, &client_visitor);
+    int received = 0;
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 0, _, MoqtObjectStatus::kNormal, _,
+                                 "object 4", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 1, _, MoqtObjectStatus::kNormal, _,
+                                 "object 5", true))
+        .WillOnce([&] { ++received; });
+    bool success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 2; });
+    EXPECT_TRUE(success);
+
+    queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
+    queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
+    queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 2, _, MoqtObjectStatus::kNormal, _,
+                                 "object 6", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 1, 3, _, MoqtObjectStatus::kEndOfGroup, _,
+                                 "", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 2, 0, _, MoqtObjectStatus::kNormal, _,
+                                 "object 7", true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor,
+                OnObjectFragment(_, 2, 1, _, MoqtObjectStatus::kNormal, _,
+                                 "object 8", true))
+        .WillOnce([&] { ++received; });
+    success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 6; });
+    EXPECT_TRUE(success);
+  }
+}
+
+TEST_F(MoqtIntegrationTest, FetchItemsFromPast) {
+  EstablishSession();
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+
+  for (MoqtForwardingPreference forwarding_preference :
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
+        MoqtForwardingPreference::kObject,
+        MoqtForwardingPreference::kDatagram}) {
+    SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
+    MockRemoteTrackVisitor client_visitor;
+    std::string name =
+        absl::StrCat("pref_", static_cast<int>(forwarding_preference));
+    auto queue = std::make_shared<MoqtOutgoingQueue>(
+        client_->session(), FullTrackName{"test", name},
+        MoqtForwardingPreference::kObject);
+    publisher.Add(queue);
+    for (int i = 0; i < 100; ++i) {
+      queue->AddObject(MemSliceFromString("object"), /*key=*/true);
+    }
+
+    client_->session()->SubscribeAbsolute("test", name, 0, 0, &client_visitor);
+    int received = 0;
+    // Those won't arrive since they have expired.
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true))
+        .Times(0);
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true))
+        .Times(0);
+    // Those are within the "last three groups" window.
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 1, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 1, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 0, _, _, _, _, true))
+        .WillOnce([&] { ++received; });
+    EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 1, _, _, _, _, true))
+        .Times(0);  // The current group should not be closed yet.
+    bool success = test_harness_.RunUntilWithDefaultTimeout(
+        [&]() { return received >= 5; });
+    EXPECT_TRUE(success);
+  }
+}
+
 TEST_F(MoqtIntegrationTest, AnnounceFailure) {
   EstablishSession();
   testing::MockFunction<void(
@@ -235,10 +352,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeAbsoluteOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -254,10 +374,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeCurrentObjectOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -273,10 +396,13 @@
 TEST_F(MoqtIntegrationTest, SubscribeCurrentGroupOk) {
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
-  MockLocalTrackVisitor server_visitor;
+
+  MoqtKnownTrackPublisher publisher;
+  server_->session()->set_publisher(&publisher);
+  auto track_publisher = std::make_shared<MockTrackPublisher>(full_track_name);
+  publisher.Add(track_publisher);
+
   MockRemoteTrackVisitor client_visitor;
-  server_->session()->AddLocalTrack(
-      full_track_name, MoqtForwardingPreference::kObject, &server_visitor);
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
@@ -293,7 +419,7 @@
   EstablishSession();
   FullTrackName full_track_name("foo", "bar");
   MockRemoteTrackVisitor client_visitor;
-  std::optional<absl::string_view> expected_reason = "Track does not exist";
+  std::optional<absl::string_view> expected_reason = "No tracks published";
   bool received_ok = false;
   EXPECT_CALL(client_visitor, OnReply(full_track_name, expected_reason))
       .WillOnce([&]() { received_ok = true; });
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.cc b/quiche/quic/moqt/moqt_known_track_publisher.cc
new file mode 100644
index 0000000..2fa4daf
--- /dev/null
+++ b/quiche/quic/moqt/moqt_known_track_publisher.cc
@@ -0,0 +1,34 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
+
+#include <memory>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/platform/api/quiche_bug_tracker.h"
+
+namespace moqt {
+
+absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>>
+MoqtKnownTrackPublisher::GetTrack(const FullTrackName& track_name) {
+  auto it = tracks_.find(track_name);
+  if (it == tracks_.end()) {
+    return absl::NotFoundError("Requested track not found");
+  }
+  return it->second;
+}
+
+void MoqtKnownTrackPublisher::Add(
+    std::shared_ptr<MoqtTrackPublisher> track_publisher) {
+  const FullTrackName& track_name = track_publisher->GetTrackName();
+  auto [it, success] = tracks_.emplace(track_name, track_publisher);
+  QUICHE_BUG_IF(MoqtKnownTrackPublisher_duplicate, !success)
+      << "Trying to add a duplicate track into a KnownTrackPublisher";
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_known_track_publisher.h b/quiche/quic/moqt/moqt_known_track_publisher.h
new file mode 100644
index 0000000..dd9316f
--- /dev/null
+++ b/quiche/quic/moqt/moqt_known_track_publisher.h
@@ -0,0 +1,38 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
+#define QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
+
+#include <memory>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+
+namespace moqt {
+
+// MoqtKnownTrackPublisher is a publisher that supports publishing a set of
+// well-known predefined tracks.
+class MoqtKnownTrackPublisher : public MoqtPublisher {
+ public:
+  MoqtKnownTrackPublisher() = default;
+  MoqtKnownTrackPublisher(const MoqtKnownTrackPublisher&) = delete;
+  MoqtKnownTrackPublisher(MoqtKnownTrackPublisher&&) = delete;
+  MoqtKnownTrackPublisher& operator=(const MoqtKnownTrackPublisher&) = delete;
+  MoqtKnownTrackPublisher& operator=(MoqtKnownTrackPublisher&&) = delete;
+
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) override;
+  void Add(std::shared_ptr<MoqtTrackPublisher> track_publisher);
+
+ private:
+  absl::flat_hash_map<FullTrackName, std::shared_ptr<MoqtTrackPublisher>>
+      tracks_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_KNOWN_TRACK_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index c2a22cd..61015be 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -80,6 +80,13 @@
   kGoawayTimeout = 0x10,
 };
 
+// Error codes used by MoQT to reset streams.
+// TODO: update with spec-defined error codes once those are available, see
+// <https://github.com/moq-wg/moq-transport/issues/481>.
+inline constexpr uint64_t kResetCodeUnknown = 0x00;
+inline constexpr uint64_t kResetCodeSubscriptionGone = 0x01;
+inline constexpr uint64_t kResetCodeTimedOut = 0x02;
+
 enum class QUICHE_EXPORT MoqtRole : uint64_t {
   kPublisher = 0x1,
   kSubscriber = 0x2,
@@ -129,6 +136,12 @@
   }
   template <typename H>
   friend H AbslHashValue(H h, const FullTrackName& m);
+
+  template <typename Sink>
+  friend void AbslStringify(Sink& sink, const FullTrackName& track_name) {
+    absl::Format(&sink, "(%s; %s)", track_name.track_namespace,
+                 track_name.track_name);
+  }
 };
 
 template <typename H>
@@ -151,6 +164,7 @@
     return (group < other.group ||
             (group == other.group && object <= other.object));
   }
+  bool operator>(const FullSequence& other) const { return !(*this <= other); }
   FullSequence& operator=(FullSequence other) {
     group = other.group;
     object = other.object;
@@ -326,6 +340,19 @@
   kStatusNotAvailable = 0x4,
 };
 
+inline bool DoesTrackStatusImplyHavingData(MoqtTrackStatusCode code) {
+  switch (code) {
+    case MoqtTrackStatusCode::kInProgress:
+    case MoqtTrackStatusCode::kFinished:
+      return true;
+    case MoqtTrackStatusCode::kDoesNotExist:
+    case MoqtTrackStatusCode::kNotYetBegun:
+    case MoqtTrackStatusCode::kStatusNotAvailable:
+      return false;
+  }
+  return false;
+}
+
 struct QUICHE_EXPORT MoqtTrackStatus {
   std::string track_namespace;
   std::string track_name;
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 9a97da0..35d1f9d 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -4,15 +4,15 @@
 
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 
-#include <cstddef>
-#include <cstdint>
+#include <memory>
 #include <optional>
 #include <utility>
+#include <vector>
 
-#include "absl/status/status.h"
 #include "absl/status/statusor.h"
-#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_mem_slice.h"
@@ -29,7 +29,7 @@
 
   if (key) {
     if (!queue_.empty()) {
-      CloseStreamForGroup(current_group_id_);
+      AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
     }
 
     if (queue_.size() == kMaxQueuedGroups) {
@@ -39,53 +39,72 @@
     ++current_group_id_;
   }
 
-  absl::string_view payload_view = payload.AsStringView();
-  uint64_t object_id = queue_.back().size();
-  queue_.back().push_back(std::move(payload));
-  PublishObject(current_group_id_, object_id, payload_view);
+  AddRawObject(MoqtObjectStatus::kNormal, std::move(payload));
 }
 
-absl::StatusOr<MoqtOutgoingQueue::PublishPastObjectsCallback>
-MoqtOutgoingQueue::OnSubscribeForPast(const SubscribeWindow& window) {
-  QUICHE_BUG_IF(
-      MoqtOutgoingQueue_requires_kGroup,
-      window.forwarding_preference() != MoqtForwardingPreference::kGroup)
-      << "MoqtOutgoingQueue currently only supports kGroup.";
-  return [this, &window]() {
-    for (size_t i = 0; i < queue_.size(); ++i) {
-      const uint64_t group_id = first_group_in_queue() + i;
-      const Group& group = queue_[i];
-      const bool is_last_group =
-          ((i == queue_.size() - 1) ||
-           !window.InWindow(FullSequence{group_id + 1, 0}));
-      for (size_t j = 0; j < group.size(); ++j) {
-        const FullSequence sequence{group_id, j};
-        if (!window.InWindow(sequence)) {
-          continue;
-        }
-        const bool is_last_object = (j == group.size() - 1);
-        PublishObject(group_id, j, group[j].AsStringView());
-        if (!is_last_group && is_last_object) {
-          // Close the group
-          CloseStreamForGroup(group_id);
-        }
+void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
+                                     quiche::QuicheMemSlice payload) {
+  FullSequence sequence{current_group_id_, queue_.back().size()};
+  queue_.back().push_back(CachedObject{
+      sequence, status,
+      std::make_shared<quiche::QuicheMemSlice>(std::move(payload))});
+  for (MoqtObjectListener* listener : listeners_) {
+    listener->OnNewObjectAvailable(sequence);
+  }
+}
+
+std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
+    FullSequence sequence) const {
+  if (sequence.group < first_group_in_queue()) {
+    return PublishedObject{FullSequence{sequence.group, sequence.object},
+                           MoqtObjectStatus::kGroupDoesNotExist,
+                           quiche::QuicheMemSlice()};
+  }
+  if (sequence.group > current_group_id_) {
+    return std::nullopt;
+  }
+  const std::vector<CachedObject>& group =
+      queue_[sequence.group - first_group_in_queue()];
+  if (sequence.object >= group.size()) {
+    if (sequence.group == current_group_id_) {
+      return std::nullopt;
+    }
+    return PublishedObject{FullSequence{sequence.group, sequence.object},
+                           MoqtObjectStatus::kObjectDoesNotExist,
+                           quiche::QuicheMemSlice()};
+  }
+  QUICHE_DCHECK(sequence == group[sequence.object].sequence);
+  return CachedObjectToPublishedObject(group[sequence.object]);
+}
+
+std::vector<FullSequence> MoqtOutgoingQueue::GetCachedObjectsInRange(
+    FullSequence start, FullSequence end) const {
+  std::vector<FullSequence> sequences;
+  SubscribeWindow window(start, end);
+  for (const Group& group : queue_) {
+    for (const CachedObject& object : group) {
+      if (window.InWindow(object.sequence)) {
+        sequences.push_back(object.sequence);
       }
     }
-  };
+  }
+  return sequences;
 }
 
-void MoqtOutgoingQueue::CloseStreamForGroup(uint64_t group_id) {
-  session_->PublishObject(track_, group_id, queue_[group_id].size(),
-                          /*object_send_order=*/group_id,
-                          MoqtObjectStatus::kEndOfGroup, "");
-  session_->CloseObjectStream(track_, group_id);
+absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const {
+  if (queue_.empty()) {
+    return MoqtTrackStatusCode::kNotYetBegun;
+  }
+  return MoqtTrackStatusCode::kInProgress;
 }
 
-void MoqtOutgoingQueue::PublishObject(uint64_t group_id, uint64_t object_id,
-                                      absl::string_view payload) {
-  session_->PublishObject(track_, group_id, object_id,
-                          /*object_send_order=*/group_id,
-                          MoqtObjectStatus::kNormal, payload);
+FullSequence MoqtOutgoingQueue::GetLargestSequence() const {
+  if (queue_.empty()) {
+    QUICHE_BUG(MoqtOutgoingQueue_GetLargestSequence_not_begun)
+        << "Calling GetLargestSequence() on a track that hasn't begun";
+    return FullSequence{0, 0};
+  }
+  return FullSequence{current_group_id_, queue_.back().size() - 1};
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index d738e43..4daa389 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -11,13 +11,13 @@
 #include <utility>
 #include <vector>
 
+#include "absl/container/flat_hash_set.h"
 #include "absl/container/inlined_vector.h"
 #include "absl/status/statusor.h"
-#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_mem_slice.h"
 
 namespace moqt {
@@ -27,15 +27,15 @@
 // groups, and maintain a buffer of three most recent groups that will be
 // provided to subscribers automatically.
 //
-// This class is primarily meant to be used by publishers to buffer the frames
-// that they produce. Limitations of this class:
-// - It currently only works with the forwarding preference of kGroup.
-// - It only supports a single session.
-// - Everything is sent in order that it is queued.
-class MoqtOutgoingQueue : public LocalTrack::Visitor {
+// This class is primarily meant to be used by original publishers to buffer the
+// frames that they produce.
+class MoqtOutgoingQueue : public MoqtTrackPublisher {
  public:
-  explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track)
-      : session_(session), track_(std::move(track)) {}
+  explicit MoqtOutgoingQueue(MoqtSession* session, FullTrackName track,
+                             MoqtForwardingPreference forwarding_preference)
+      : session_(session),
+        track_(std::move(track)),
+        forwarding_preference_(forwarding_preference) {}
 
   MoqtOutgoingQueue(const MoqtOutgoingQueue&) = delete;
   MoqtOutgoingQueue(MoqtOutgoingQueue&&) = default;
@@ -46,32 +46,45 @@
   // group is closed. The first object ever sent MUST have `key` set to true.
   void AddObject(quiche::QuicheMemSlice payload, bool key);
 
-  // LocalTrack::Visitor implementation.
-  absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
-      const SubscribeWindow& window) override;
+  // MoqtTrackPublisher implementation.
+  const FullTrackName& GetTrackName() const override { return track_; }
+  std::optional<PublishedObject> GetCachedObject(
+      FullSequence sequence) const override;
+  std::vector<FullSequence> GetCachedObjectsInRange(
+      FullSequence start, FullSequence end) const override;
+  void AddObjectListener(MoqtObjectListener* listener) override {
+    listeners_.insert(listener);
+  }
+  void RemoveObjectListener(MoqtObjectListener* listener) override {
+    listeners_.erase(listener);
+  }
+  absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const override;
+  FullSequence GetLargestSequence() const override;
+  MoqtForwardingPreference GetForwardingPreference() const override {
+    return forwarding_preference_;
+  }
 
- protected:
-  // Interface to MoqtSession; can be mocked out for tests.
-  virtual void CloseStreamForGroup(uint64_t group_id);
-  virtual void PublishObject(uint64_t group_id, uint64_t object_id,
-                             absl::string_view payload);
+  bool HasSubscribers() const { return !listeners_.empty(); }
 
  private:
   // The number of recent groups to keep around for newly joined subscribers.
   static constexpr size_t kMaxQueuedGroups = 3;
 
-  using Object = quiche::QuicheMemSlice;
-  using Group = std::vector<Object>;
+  using Group = std::vector<CachedObject>;
+
+  void AddRawObject(MoqtObjectStatus status, quiche::QuicheMemSlice payload);
 
   // The number of the oldest group available.
-  uint64_t first_group_in_queue() {
+  uint64_t first_group_in_queue() const {
     return current_group_id_ - queue_.size() + 1;
   }
 
   MoqtSession* session_;  // Not owned.
   FullTrackName track_;
+  MoqtForwardingPreference forwarding_preference_;
   absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
   uint64_t current_group_id_ = -1;
+  absl::flat_hash_set<MoqtObjectListener*> listeners_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index ac22b64..925e369 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -5,11 +5,12 @@
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 
 #include <cstdint>
-#include <utility>
+#include <optional>
+#include <vector>
 
-#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/common/platform/api/quiche_expect_bug.h"
@@ -20,24 +21,45 @@
 namespace {
 
 using ::quic::test::MemSliceFromString;
+using ::testing::AnyOf;
 
-class TestMoqtOutgoingQueue : public MoqtOutgoingQueue {
+class TestMoqtOutgoingQueue : public MoqtOutgoingQueue,
+                              public MoqtObjectListener {
  public:
   TestMoqtOutgoingQueue()
-      : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"}) {}
-
-  void CallSubscribeForPast(const SubscribeWindow& window) {
-    absl::StatusOr<PublishPastObjectsCallback> callback =
-        OnSubscribeForPast(window);
-    QUICHE_CHECK_OK(callback.status());
-    (*std::move(callback))();
+      : MoqtOutgoingQueue(nullptr, FullTrackName{"test", "track"},
+                          MoqtForwardingPreference::kGroup) {
+    AddObjectListener(this);
   }
 
-  MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), (override));
+  void OnNewObjectAvailable(FullSequence sequence) override {
+    std::optional<PublishedObject> object = GetCachedObject(sequence);
+    QUICHE_CHECK(object.has_value());
+    ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal,
+                                      MoqtObjectStatus::kEndOfGroup));
+    if (object->status == MoqtObjectStatus::kNormal) {
+      PublishObject(object->sequence.group, object->sequence.object,
+                    object->payload.AsStringView());
+    } else {
+      CloseStreamForGroup(object->sequence.group);
+    }
+  }
+
+  void CallSubscribeForPast(const SubscribeWindow& window) {
+    std::vector<FullSequence> objects =
+        GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
+    for (FullSequence object : objects) {
+      if (window.InWindow(object)) {
+        OnNewObjectAvailable(object);
+      }
+    }
+  }
+
+  MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
   MOCK_METHOD(void, PublishObject,
               (uint64_t group_id, uint64_t object_id,
                absl::string_view payload),
-              (override));
+              ());
 };
 
 TEST(MoqtOutgoingQueue, FirstObjectNotKeyframe) {
@@ -74,8 +96,7 @@
   queue.AddObject(MemSliceFromString("a"), true);
   queue.AddObject(MemSliceFromString("b"), false);
   queue.AddObject(MemSliceFromString("c"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 0));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 0));
 }
 
 TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
@@ -92,8 +113,7 @@
   queue.AddObject(MemSliceFromString("a"), true);
   queue.AddObject(MemSliceFromString("b"), false);
   queue.AddObject(MemSliceFromString("c"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(0, 3), 0, 1));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 1));
 }
 
 TEST(MoqtOutgoingQueue, TwoGroups) {
@@ -141,8 +161,7 @@
   queue.AddObject(MemSliceFromString("d"), true);
   queue.AddObject(MemSliceFromString("e"), false);
   queue.AddObject(MemSliceFromString("f"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(1, 3), 0, 1));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 1));
 }
 
 TEST(MoqtOutgoingQueue, FiveGroups) {
@@ -215,8 +234,7 @@
   queue.AddObject(MemSliceFromString("h"), false);
   queue.AddObject(MemSliceFromString("i"), true);
   queue.AddObject(MemSliceFromString("j"), false);
-  queue.CallSubscribeForPast(SubscribeWindow(
-      0, MoqtForwardingPreference::kGroup, FullSequence(4, 2), 0, 0));
+  queue.CallSubscribeForPast(SubscribeWindow(0, 0));
 }
 
 }  // namespace
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
new file mode 100644
index 0000000..ab163d5
--- /dev/null
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -0,0 +1,107 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
+#define QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
+
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "absl/status/statusor.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+
+namespace moqt {
+
+// PublishedObject is a description of an object that is sufficient to publish
+// it on a given track.
+struct PublishedObject {
+  FullSequence sequence;
+  MoqtObjectStatus status;
+  quiche::QuicheMemSlice payload;
+};
+
+// MoqtObjectListener is an interface for any entity that is listening for
+// incoming objects for a given track.
+class MoqtObjectListener {
+ public:
+  virtual ~MoqtObjectListener() = default;
+
+  // Notifies that an object with the given sequence number has become
+  // available.  The object payload itself may be retrieved via GetCachedObject
+  // method of the associated track publisher.
+  virtual void OnNewObjectAvailable(FullSequence sequence) = 0;
+};
+
+// MoqtTrackPublisher is an application-side API for an MoQT publisher
+// of a single individual track.
+class MoqtTrackPublisher {
+ public:
+  virtual ~MoqtTrackPublisher() = default;
+
+  // Returns the full name of the associated track.
+  virtual const FullTrackName& GetTrackName() const = 0;
+
+  // GetCachedObject lets the MoQT stack access the objects that are available
+  // in the track's built-in local cache.
+  //
+  // This implementation of MoQT does not store any objects within the MoQT
+  // stack itself, at least until the object is fully serialized and passed to
+  // the QUIC stack. Instead, it relies on individual tracks having a shared
+  // cache for recent objects, and objects are always pulled from that cache
+  // whenever they are sent.  Once an object is not available via the cache, it
+  // can no longer be sent; this ensures that objects are not buffered forever.
+  //
+  // This method returns nullopt if the object is not currently available, but
+  // might become available in the future.  If the object is gone forever,
+  // kGroupDoesNotExist/kObjectDoesNotExist has to be returned instead;
+  // otherwise, the corresponding QUIC streams will be stuck waiting for objects
+  // that will never arrive.
+  virtual std::optional<PublishedObject> GetCachedObject(
+      FullSequence sequence) const = 0;
+
+  // Returns a full list of objects available in the cache, to be used for
+  // SUBSCRIBEs with a backfill.
+  virtual std::vector<FullSequence> GetCachedObjectsInRange(
+      FullSequence start, FullSequence end) const = 0;
+
+  // TODO: add an API to fetch past objects that are out of cache and might
+  // require an upstream request to fill the relevant cache again. This is
+  // currently done since the specification does not clearly describe how this
+  // is supposed to be done, especially with respect to such things as
+  // backpressure.
+
+  // Registers a listener with the track.  The listener will be notified of all
+  // newly arriving objects. The pointer to the listener must be valid until
+  // removed.
+  virtual void AddObjectListener(MoqtObjectListener* listener) = 0;
+  virtual void RemoveObjectListener(MoqtObjectListener* listener) = 0;
+
+  virtual absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const = 0;
+
+  // Returns the largest sequence pair that has been published so far.
+  // This method may only be called if
+  // DoesTrackStatusImplyHavingData(GetTrackStatus()) is true.
+  virtual FullSequence GetLargestSequence() const = 0;
+
+  // Returns the forwarding preference of the track.
+  // This method may only be called if
+  // DoesTrackStatusImplyHavingData(GetTrackStatus()) is true.
+  virtual MoqtForwardingPreference GetForwardingPreference() const = 0;
+};
+
+// MoqtPublisher is an interface to a publisher that allows it to publish
+// multiple tracks.
+class MoqtPublisher {
+ public:
+  virtual ~MoqtPublisher() = default;
+
+  virtual absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) = 0;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_PUBLISHER_H_
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 023bdcc..154c8ba 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -16,6 +16,7 @@
 
 #include "absl/algorithm/container.h"
 #include "absl/container/flat_hash_map.h"
+#include "absl/container/flat_hash_set.h"
 #include "absl/container/node_hash_map.h"
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
@@ -23,15 +24,18 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_stream.h"
+#include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/web_transport/web_transport.h"
 
 #define ENDPOINT \
@@ -39,8 +43,60 @@
 
 namespace moqt {
 
+namespace {
+
 using ::quic::Perspective;
 
+bool PublisherHasData(const MoqtTrackPublisher& publisher) {
+  absl::StatusOr<MoqtTrackStatusCode> status = publisher.GetTrackStatus();
+  return status.ok() && DoesTrackStatusImplyHavingData(*status);
+}
+
+SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe,
+                                         MoqtTrackPublisher& publisher) {
+  const FullSequence sequence = PublisherHasData(publisher)
+                                    ? publisher.GetLargestSequence()
+                                    : FullSequence{0, 0};
+  switch (GetFilterType(subscribe)) {
+    case MoqtFilterType::kLatestGroup:
+      return SubscribeWindow(sequence.group, 0);
+    case MoqtFilterType::kLatestObject:
+      return SubscribeWindow(sequence.group, sequence.object);
+    case MoqtFilterType::kAbsoluteStart:
+      return SubscribeWindow(*subscribe.start_group, *subscribe.start_object);
+    case MoqtFilterType::kAbsoluteRange:
+      return SubscribeWindow(*subscribe.start_group, *subscribe.start_object,
+                             *subscribe.end_group, *subscribe.end_object);
+    case MoqtFilterType::kNone:
+      QUICHE_BUG(MoqtSession_Subscription_invalid_filter_passed);
+      return SubscribeWindow(0, 0);
+  }
+}
+
+class DefaultPublisher : public MoqtPublisher {
+ public:
+  static DefaultPublisher* GetInstance() {
+    static DefaultPublisher* instance = new DefaultPublisher();
+    return instance;
+  }
+
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> GetTrack(
+      const FullTrackName& track_name) override {
+    return absl::NotFoundError("No tracks published");
+  }
+};
+}  // namespace
+
+MoqtSession::MoqtSession(webtransport::Session* session,
+                         MoqtSessionParameters parameters,
+                         MoqtSessionCallbacks callbacks)
+    : session_(session),
+      parameters_(parameters),
+      callbacks_(std::move(callbacks)),
+      framer_(quiche::SimpleBufferAllocator::Get(), parameters.using_webtrans),
+      publisher_(DefaultPublisher::GetInstance()),
+      liveness_token_(std::make_shared<Empty>()) {}
+
 MoqtSession::ControlStream* MoqtSession::GetControlStream() {
   if (!control_stream_.has_value()) {
     return nullptr;
@@ -154,13 +210,6 @@
   std::move(callbacks_.session_terminated_callback)(error);
 }
 
-void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
-                                MoqtForwardingPreference forwarding_preference,
-                                LocalTrack::Visitor* visitor) {
-  local_tracks_.try_emplace(full_track_name, full_track_name,
-                            forwarding_preference, visitor);
-}
-
 // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
 // trigger session errors.
 void MoqtSession::Announce(absl::string_view track_namespace,
@@ -188,23 +237,6 @@
   pending_outgoing_announces_[track_namespace] = std::move(announce_callback);
 }
 
-bool MoqtSession::HasSubscribers(const FullTrackName& full_track_name) const {
-  auto it = local_tracks_.find(full_track_name);
-  return (it != local_tracks_.end() && it->second.HasSubscriber());
-}
-
-void MoqtSession::CancelAnnounce(absl::string_view track_namespace) {
-  for (auto it = local_tracks_.begin(); it != local_tracks_.end(); ++it) {
-    if (it->first.track_namespace == track_namespace) {
-      it->second.set_announce_cancel();
-    }
-  }
-  absl::erase_if(local_tracks_, [&](const auto& it) {
-    return it.first.track_namespace == track_namespace &&
-           !it.second.HasSubscriber();
-  });
-}
-
 bool MoqtSession::SubscribeAbsolute(absl::string_view track_namespace,
                                     absl::string_view name,
                                     uint64_t start_group, uint64_t start_object,
@@ -310,33 +342,31 @@
 
 bool MoqtSession::SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                                   absl::string_view reason_phrase) {
-  // Search all the tracks to find the subscribe ID.
-  auto name_it = local_track_by_subscribe_id_.find(subscribe_id);
-  if (name_it == local_track_by_subscribe_id_.end()) {
+  auto it = published_subscriptions_.find(subscribe_id);
+  if (it == published_subscriptions_.end()) {
     return false;
   }
-  auto track_it = local_tracks_.find(name_it->second);
-  if (track_it == local_tracks_.end()) {
-    return false;
-  }
-  LocalTrack& track = track_it->second;
+
+  PublishedSubscription& subscription = *it->second;
+  std::vector<webtransport::StreamId> streams_to_reset =
+      subscription.GetAllStreams();
+
   MoqtSubscribeDone subscribe_done;
   subscribe_done.subscribe_id = subscribe_id;
   subscribe_done.status_code = code;
   subscribe_done.reason_phrase = reason_phrase;
-  SubscribeWindow* window = track.GetWindow(subscribe_id);
-  if (window == nullptr) {
-    return false;
-  }
-  subscribe_done.final_id = window->largest_delivered();
+  subscribe_done.final_id = subscription.largest_sent();
   SendControlMessage(framer_.SerializeSubscribeDone(subscribe_done));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_DONE message for "
                   << subscribe_id;
   // Clean up the subscription
-  track.DeleteWindow(subscribe_id);
-  local_track_by_subscribe_id_.erase(name_it);
-  if (track.canceled() && !track.HasSubscriber()) {
-    local_tracks_.erase(track_it);
+  published_subscriptions_.erase(it);
+  for (webtransport::StreamId stream_id : streams_to_reset) {
+    webtransport::Stream* stream = session_->GetStreamById(stream_id);
+    if (stream == nullptr) {
+      continue;
+    }
+    stream->ResetWithUserCode(kResetCodeSubscriptionGone);
   }
   return true;
 }
@@ -367,18 +397,51 @@
   return true;
 }
 
-std::optional<webtransport::StreamId> MoqtSession::OpenUnidirectionalStream() {
+webtransport::Stream* MoqtSession::OpenOrQueueDataStream(
+    uint64_t subscription_id, FullSequence first_object) {
   if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
-    return std::nullopt;
+    queued_outgoing_data_streams_.push_back(
+        QueuedOutgoingDataStream{subscription_id, first_object});
+    // TODO: limit the number of streams in the queue.
+    return nullptr;
   }
+  return OpenDataStream(subscription_id, first_object);
+}
+
+webtransport::Stream* MoqtSession::OpenDataStream(uint64_t subscription_id,
+                                                  FullSequence first_object) {
+  auto it = published_subscriptions_.find(subscription_id);
+  if (it == published_subscriptions_.end()) {
+    // It is possible that the subscription has been discarded while the stream
+    // was in the queue; discard those streams.
+    return nullptr;
+  }
+  PublishedSubscription& subscription = *it->second;
+
   webtransport::Stream* new_stream =
       session_->OpenOutgoingUnidirectionalStream();
   if (new_stream == nullptr) {
-    return std::nullopt;
+    QUICHE_BUG(MoqtSession_OpenDataStream_blocked)
+        << "OpenDataStream called when creation of new streams is blocked.";
+    return nullptr;
   }
-  new_stream->SetVisitor(
-      std::make_unique<OutgoingDataStream>(this, new_stream));
-  return new_stream->GetStreamId();
+  new_stream->SetVisitor(std::make_unique<OutgoingDataStream>(
+      this, new_stream, subscription_id, first_object));
+  subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object);
+  return new_stream;
+}
+
+void MoqtSession::OnCanCreateNewOutgoingUnidirectionalStream() {
+  while (!queued_outgoing_data_streams_.empty() &&
+         session_->CanOpenNextOutgoingUnidirectionalStream()) {
+    QueuedOutgoingDataStream next = queued_outgoing_data_streams_.front();
+    queued_outgoing_data_streams_.pop_front();
+    webtransport::Stream* stream =
+        OpenDataStream(next.subscription_id, next.first_object);
+    if (stream != nullptr) {
+      stream->visitor()->OnCanWrite();
+    }
+  }
 }
 
 std::pair<FullTrackName, RemoteTrack::Visitor*>
@@ -422,157 +485,6 @@
        track.visitor()});
 }
 
-// TODO(martinduke): Throw errors if the object status is inconsistent with
-// sequence numbers we have already observed on the track.
-bool MoqtSession::PublishObject(const FullTrackName& full_track_name,
-                                uint64_t group_id, uint64_t object_id,
-                                uint64_t object_send_order,
-                                MoqtObjectStatus status,
-                                absl::string_view payload) {
-  auto track_it = local_tracks_.find(full_track_name);
-  if (track_it == local_tracks_.end()) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return false;
-  }
-  // TODO(martinduke): Write a test for this QUIC_BUG.
-  QUIC_BUG_IF(moqt_publish_abnormal_with_payload,
-              status != MoqtObjectStatus::kNormal && !payload.empty());
-  LocalTrack& track = track_it->second;
-  bool end_of_stream = false;
-  MoqtForwardingPreference forwarding_preference =
-      track.forwarding_preference();
-  switch (forwarding_preference) {
-    case MoqtForwardingPreference::kTrack:
-      end_of_stream = (status == MoqtObjectStatus::kEndOfTrack);
-      break;
-    case MoqtForwardingPreference::kObject:
-    case MoqtForwardingPreference::kDatagram:
-      end_of_stream = true;
-      break;
-    case MoqtForwardingPreference::kGroup:
-      end_of_stream = (status == MoqtObjectStatus::kEndOfGroup ||
-                       status == MoqtObjectStatus::kGroupDoesNotExist ||
-                       status == MoqtObjectStatus::kEndOfTrack);
-      break;
-  }
-  FullSequence sequence{group_id, object_id};
-  track.SentSequence(sequence, status);
-  std::vector<SubscribeWindow*> subscriptions =
-      track.ShouldSend({group_id, object_id});
-  if (subscriptions.empty()) {
-    return true;
-  }
-  MoqtObject object;
-  QUICHE_DCHECK(track.track_alias().has_value());
-  object.track_alias = *track.track_alias();
-  object.group_id = group_id;
-  object.object_id = object_id;
-  object.object_send_order = object_send_order;
-  object.object_status = status;
-  object.forwarding_preference = forwarding_preference;
-  object.payload_length = payload.size();
-  int failures = 0;
-  quiche::StreamWriteOptions write_options;
-  write_options.set_send_fin(end_of_stream);
-  absl::flat_hash_set<uint64_t> subscribes_to_close;
-  for (auto subscription : subscriptions) {
-    if (subscription->OnObjectSent(sequence, status)) {
-      subscribes_to_close.insert(subscription->subscribe_id());
-    }
-    if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
-      object.subscribe_id = subscription->subscribe_id();
-      quiche::QuicheBuffer datagram =
-          framer_.SerializeObjectDatagram(object, payload);
-      // TODO(martinduke): It's OK to just silently fail, but better to notify
-      // the app on errors.
-      session_->SendOrQueueDatagram(datagram.AsStringView());
-      continue;
-    }
-    bool new_stream = false;
-    std::optional<webtransport::StreamId> stream_id =
-        subscription->GetStreamForSequence(sequence);
-    if (!stream_id.has_value()) {
-      new_stream = true;
-      stream_id = OpenUnidirectionalStream();
-      if (!stream_id.has_value()) {
-        QUICHE_DLOG(ERROR) << ENDPOINT
-                           << "Sending OBJECT to nonexistent stream";
-        ++failures;
-        continue;
-      }
-      if (!end_of_stream) {
-        subscription->AddStream(group_id, object_id, *stream_id);
-      }
-    }
-    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
-    if (stream == nullptr) {
-      QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream "
-                         << *stream_id;
-      ++failures;
-      continue;
-    }
-    object.subscribe_id = subscription->subscribe_id();
-    quiche::QuicheBuffer header =
-        framer_.SerializeObjectHeader(object, new_stream);
-    std::array<absl::string_view, 2> views = {header.AsStringView(), payload};
-    if (!stream->Writev(views, write_options).ok()) {
-      QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
-      ++failures;
-      continue;
-    }
-    QUICHE_DVLOG(1) << ENDPOINT << "Sending object length " << payload.length()
-                    << " for " << full_track_name.track_namespace << ":"
-                    << full_track_name.track_name << " with sequence "
-                    << object.group_id << ":" << object.object_id
-                    << " on stream " << *stream_id;
-    if (end_of_stream && !new_stream) {
-      subscription->RemoveStream(group_id, object_id);
-    }
-  }
-  for (uint64_t subscribe_id : subscribes_to_close) {
-    SubscribeIsDone(subscribe_id, SubscribeDoneCode::kSubscriptionEnded, "");
-  }
-  return (failures == 0);
-}
-
-void MoqtSession::CloseObjectStream(const FullTrackName& full_track_name,
-                                    uint64_t group_id) {
-  auto track_it = local_tracks_.find(full_track_name);
-  if (track_it == local_tracks_.end()) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return;
-  }
-  LocalTrack& track = track_it->second;
-
-  MoqtForwardingPreference forwarding_preference =
-      track.forwarding_preference();
-  if (forwarding_preference == MoqtForwardingPreference::kObject ||
-      forwarding_preference == MoqtForwardingPreference::kDatagram) {
-    QUIC_BUG(MoqtSession_CloseStreamObject_wrong_type)
-        << "Forwarding preferences of Object or Datagram require stream to be "
-           "immediately closed, and thus are not valid CloseObjectStream() "
-           "targets";
-    return;
-  }
-
-  std::vector<SubscribeWindow*> subscriptions =
-      track.ShouldSend({group_id, /*object=*/0});
-  for (SubscribeWindow* subscription : subscriptions) {
-    std::optional<webtransport::StreamId> stream_id =
-        subscription->GetStreamForSequence(
-            FullSequence(group_id, /*object=*/0));
-    if (!stream_id.has_value()) {
-      continue;
-    }
-    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
-    if (stream == nullptr) {
-      continue;
-    }
-    bool success = stream->SendFin();
-    QUICHE_BUG_IF(MoqtSession_CloseObjectStream_fin_failed, !success);
-  }
-}
-
 static void ForwardStreamDataToParser(webtransport::Stream& stream,
                                       MoqtParser& parser) {
   bool fin =
@@ -673,7 +585,6 @@
 
 void MoqtSession::ControlStream::OnSubscribeMessage(
     const MoqtSubscribe& message) {
-  std::string reason_phrase = "";
   if (session_->peer_role_ == MoqtRole::kPublisher) {
     QUIC_DLOG(INFO) << ENDPOINT << "Publisher peer sent SUBSCRIBE";
     session_->Error(MoqtError::kProtocolViolation,
@@ -682,99 +593,40 @@
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.track_namespace << ":" << message.track_name;
-  auto it = session_->local_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->local_tracks_.end()) {
-    QUIC_DLOG(INFO) << ENDPOINT << "Rejected because "
-                    << message.track_namespace << ":" << message.track_name
-                    << " does not exist";
+
+  FullTrackName track_name =
+      FullTrackName{message.track_namespace, message.track_name};
+  absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher =
+      session_->publisher_->GetTrack(track_name);
+  if (!track_publisher.ok()) {
+    QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
+                    << " rejected by the application: "
+                    << track_publisher.status();
     SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                       "Track does not exist", message.track_alias);
+                       track_publisher.status().message(), message.track_alias);
     return;
   }
-  LocalTrack& track = it->second;
-  if (it->second.canceled()) {
-    // Note that if the track has already been deleted, there will not be a
-    // protocol violation, which the spec says there SHOULD be. It's not worth
-    // keeping state on deleted tracks.
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Received SUBSCRIBE for canceled track");
-    return;
+  std::optional<FullSequence> largest_id;
+  if (PublisherHasData(**track_publisher)) {
+    largest_id = (*track_publisher)->GetLargestSequence();
   }
-  if ((track.track_alias().has_value() &&
-       message.track_alias != *track.track_alias()) ||
-      session_->used_track_aliases_.contains(message.track_alias)) {
-    // Propose a different track_alias.
-    SendSubscribeError(message, SubscribeErrorCode::kRetryTrackAlias,
-                       "Track alias already exists",
-                       session_->next_local_track_alias_++);
-    return;
-  } else {  // Use client-provided alias.
-    track.set_track_alias(message.track_alias);
-    if (message.track_alias >= session_->next_local_track_alias_) {
-      session_->next_local_track_alias_ = message.track_alias + 1;
-    }
-    session_->used_track_aliases_.insert(message.track_alias);
+
+  auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
+      session_, *std::move(track_publisher), message);
+  auto [it, success] = session_->published_subscriptions_.emplace(
+      message.subscribe_id, std::move(subscription));
+  if (!success) {
+    SendSubscribeError(message, SubscribeErrorCode::kInternalError,
+                       "Duplicate subscribe ID", message.track_alias);
   }
-  FullSequence start;
-  if (message.start_group.has_value()) {
-    // The filter is AbsoluteStart or AbsoluteRange.
-    QUIC_BUG_IF(quic_bug_invalid_subscribe, !message.start_object.has_value())
-        << "Start group without start object";
-    start = FullSequence(*message.start_group, *message.start_object);
-  } else {
-    // The filter is LatestObject or LatestGroup.
-    start = track.next_sequence();
-    if (message.start_object.has_value()) {
-      // The filter is LatestGroup.
-      QUIC_BUG_IF(quic_bug_invalid_subscribe, *message.start_object != 0)
-          << "LatestGroup does not start with zero";
-      start.object = 0;
-    } else {
-      --start.object;
-    }
-  }
-  LocalTrack::Visitor::PublishPastObjectsCallback publish_past_objects;
-  std::optional<SubscribeWindow> past_window;
-  if (start < track.next_sequence() && track.visitor() != nullptr) {
-    // Pull a copy of objects that have already been published.
-    FullSequence end_of_past_subscription{
-        message.end_group.has_value() ? *message.end_group : UINT64_MAX,
-        message.end_object.has_value() ? *message.end_object : UINT64_MAX};
-    end_of_past_subscription =
-        std::min(end_of_past_subscription, track.next_sequence());
-    past_window.emplace(message.subscribe_id, track.forwarding_preference(),
-                        track.next_sequence(), start, end_of_past_subscription);
-    absl::StatusOr<LocalTrack::Visitor::PublishPastObjectsCallback>
-        past_objects_available =
-            track.visitor()->OnSubscribeForPast(*past_window);
-    if (!past_objects_available.ok()) {
-      SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                         past_objects_available.status().message(),
-                         message.track_alias);
-      return;
-    }
-    publish_past_objects = *std::move(past_objects_available);
-  }
+
   MoqtSubscribeOk subscribe_ok;
   subscribe_ok.subscribe_id = message.subscribe_id;
+  subscribe_ok.largest_id = largest_id;
   SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
-  QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
-                  << message.track_namespace << ":" << message.track_name;
-  if (!message.end_group.has_value()) {
-    track.AddWindow(message.subscribe_id, start.group, start.object);
-  } else if (message.end_object.has_value()) {
-    track.AddWindow(message.subscribe_id, start.group, start.object,
-                    *message.end_group, *message.end_object);
-  } else {
-    track.AddWindow(message.subscribe_id, start.group, start.object,
-                    *message.end_group);
-  }
-  session_->local_track_by_subscribe_id_.emplace(message.subscribe_id,
-                                                 track.full_track_name());
-  if (publish_past_objects) {
-    QUICHE_DCHECK(past_window.has_value());
-    std::move(publish_past_objects)();
+
+  if (largest_id.has_value()) {
+    it->second->Backfill();
   }
 }
 
@@ -850,19 +702,8 @@
 
 void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
-  // Search all the tracks to find the subscribe ID.
-  auto name_it =
-      session_->local_track_by_subscribe_id_.find(message.subscribe_id);
-  if (name_it == session_->local_track_by_subscribe_id_.end()) {
-    return;
-  }
-  auto track_it = session_->local_tracks_.find(name_it->second);
-  if (track_it == session_->local_tracks_.end()) {
-    return;
-  }
-  LocalTrack& track = track_it->second;
-  SubscribeWindow* window = track.GetWindow(message.subscribe_id);
-  if (window == nullptr) {
+  auto it = session_->published_subscriptions_.find(message.subscribe_id);
+  if (it == session_->published_subscriptions_.end()) {
     return;
   }
   FullSequence start(message.start_group, message.start_object);
@@ -872,15 +713,7 @@
                                                ? *message.end_object
                                                : UINT64_MAX);
   }
-  // TODO(martinduke): Handle the case where the update range is invalid.
-  if (window->UpdateStartEnd(start, end)) {
-    std::optional<FullSequence> largest_delivered = window->largest_delivered();
-    if (largest_delivered.has_value() && end <= *largest_delivered) {
-      session_->SubscribeIsDone(message.subscribe_id,
-                                SubscribeDoneCode::kSubscriptionEnded,
-                                "SUBSCRIBE_UPDATE moved subscription end");
-    }
-  }
+  it->second->Update(start, end);
 }
 
 void MoqtSession::ControlStream::OnAnnounceMessage(
@@ -935,7 +768,7 @@
 
 void MoqtSession::ControlStream::OnAnnounceCancelMessage(
     const MoqtAnnounceCancel& message) {
-  session_->CancelAnnounce(message.track_namespace);
+  // TODO: notify the application about this.
 }
 
 void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
@@ -1007,8 +840,309 @@
   session_->Error(error_code, absl::StrCat("Parse error: ", reason));
 }
 
+MoqtSession::PublishedSubscription::PublishedSubscription(
+    MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher,
+    const MoqtSubscribe& subscribe)
+    : subscription_id_(subscribe.subscribe_id),
+      session_(session),
+      track_publisher_(track_publisher),
+      track_alias_(subscribe.track_alias),
+      window_(SubscribeMessageToWindow(subscribe, *track_publisher)) {
+  track_publisher->AddObjectListener(this);
+  QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
+                  << subscribe.track_namespace << ":" << subscribe.track_name;
+}
+
+MoqtSession::PublishedSubscription::~PublishedSubscription() {
+  track_publisher_->RemoveObjectListener(this);
+}
+
+SendStreamMap& MoqtSession::PublishedSubscription::stream_map() {
+  // The stream map is lazily initialized, since initializing it requires
+  // knowing the forwarding preference in advance, and it might not be known
+  // when the subscription is first created.
+  if (!lazily_initialized_stream_map_.has_value()) {
+    QUICHE_DCHECK(
+        DoesTrackStatusImplyHavingData(*track_publisher_->GetTrackStatus()));
+    lazily_initialized_stream_map_.emplace(
+        track_publisher_->GetForwardingPreference());
+  }
+  return *lazily_initialized_stream_map_;
+}
+
+void MoqtSession::PublishedSubscription::Update(
+    FullSequence start, std::optional<FullSequence> end) {
+  window_.UpdateStartEnd(start, end);
+  // TODO: reset streams that are no longer in-window.
+  // TODO: send SUBSCRIBE_DONE if required.
+  // TODO: send an error for invalid updates now that it's a part of draft-05.
+}
+
+void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
+    FullSequence sequence) {
+  if (!window_.InWindow(sequence)) {
+    return;
+  }
+
+  MoqtForwardingPreference forwarding_preference =
+      track_publisher_->GetForwardingPreference();
+  if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
+    SendDatagram(sequence);
+    return;
+  }
+
+  std::optional<webtransport::StreamId> stream_id =
+      stream_map().GetStreamForSequence(sequence);
+  webtransport::Stream* raw_stream = nullptr;
+  if (stream_id.has_value()) {
+    raw_stream = session_->session_->GetStreamById(*stream_id);
+  } else {
+    if (!window_.IsStreamProvokingObject(sequence, forwarding_preference)) {
+      QUIC_DLOG(INFO) << ENDPOINT << "Received object " << sequence
+                      << ", but there is no stream that it can be mapped to";
+      // It is possible that the we are getting notified of objects out of
+      // order, but we still have to send objects in a manner consistent with
+      // the forwarding preference used.
+      return;
+    }
+    raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
+  }
+  if (raw_stream == nullptr) {
+    return;
+  }
+
+  OutgoingDataStream* stream =
+      static_cast<OutgoingDataStream*>(raw_stream->visitor());
+  stream->SendObjects(*this);
+}
+
+void MoqtSession::PublishedSubscription::Backfill() {
+  const FullSequence start = window_.start();
+  const FullSequence end = track_publisher_->GetLargestSequence();
+  const MoqtForwardingPreference preference =
+      track_publisher_->GetForwardingPreference();
+
+  absl::flat_hash_set<ReducedSequenceIndex> already_opened;
+  std::vector<FullSequence> objects =
+      track_publisher_->GetCachedObjectsInRange(start, end);
+  QUICHE_DCHECK(absl::c_is_sorted(objects));
+  for (FullSequence sequence : objects) {
+    auto [it, was_missing] =
+        already_opened.insert(ReducedSequenceIndex(sequence, preference));
+    if (!was_missing) {
+      // For every stream mapping unit present, we only need to notify of the
+      // earliest object on it, since the stream itself will pull the rest.
+      continue;
+    }
+    OnNewObjectAvailable(sequence);
+  }
+}
+
+std::vector<webtransport::StreamId>
+MoqtSession::PublishedSubscription::GetAllStreams() const {
+  if (!lazily_initialized_stream_map_.has_value()) {
+    return {};
+  }
+  return lazily_initialized_stream_map_->GetAllStreams();
+}
+
+void MoqtSession::PublishedSubscription::OnDataStreamCreated(
+    webtransport::StreamId id, FullSequence start_sequence) {
+  stream_map().AddStream(start_sequence, id);
+}
+void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
+    webtransport::StreamId id, FullSequence end_sequence) {
+  stream_map().RemoveStream(end_sequence, id);
+}
+
+void MoqtSession::PublishedSubscription::OnObjectSent(FullSequence sequence) {
+  if (largest_sent_.has_value()) {
+    largest_sent_ = std::max(*largest_sent_, sequence);
+  } else {
+    largest_sent_ = sequence;
+  }
+  // TODO: send SUBSCRIBE_DONE if the subscription is done.
+}
+
+MoqtSession::OutgoingDataStream::OutgoingDataStream(
+    MoqtSession* session, webtransport::Stream* stream,
+    uint64_t subscription_id, FullSequence first_object)
+    : session_(session),
+      stream_(stream),
+      subscription_id_(subscription_id),
+      next_object_(first_object),
+      session_liveness_(session->liveness_token_) {}
+
+MoqtSession::OutgoingDataStream::~OutgoingDataStream() {
+  // Though it might seem intuitive that the session object has to outlive the
+  // connection object (and this is indeed how something like QuicSession and
+  // QuicStream works), this is not the true for WebTransport visitors: the
+  // session getting destroyed will inevitably lead to all related streams being
+  // destroyed, but the actual order of destruction is not guaranteed.  Thus, we
+  // need to check if the session still exists while accessing it in a stream
+  // destructor.
+  if (session_liveness_.expired()) {
+    return;
+  }
+  auto it = session_->published_subscriptions_.find(subscription_id_);
+  if (it != session_->published_subscriptions_.end()) {
+    it->second->OnDataStreamDestroyed(stream_->GetStreamId(), next_object_);
+  }
+}
+
 void MoqtSession::OutgoingDataStream::OnCanWrite() {
-  // TODO: handle backpressure on data streams.
+  PublishedSubscription* subscription = GetSubscriptionIfValid();
+  if (subscription == nullptr) {
+    return;
+  }
+  SendObjects(*subscription);
+}
+
+MoqtSession::PublishedSubscription*
+MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() {
+  auto it = session_->published_subscriptions_.find(subscription_id_);
+  if (it == session_->published_subscriptions_.end()) {
+    stream_->ResetWithUserCode(kResetCodeSubscriptionGone);
+    return nullptr;
+  }
+
+  PublishedSubscription* subscription = it->second.get();
+  MoqtTrackPublisher& publisher = subscription->publisher();
+  absl::StatusOr<MoqtTrackStatusCode> status = publisher.GetTrackStatus();
+  if (!status.ok()) {
+    // TODO: clean up the subscription.
+    return nullptr;
+  }
+  if (!DoesTrackStatusImplyHavingData(*status)) {
+    QUICHE_BUG(GetSubscriptionIfValid_InvalidTrackStatus)
+        << "The track publisher returned a status indicating that no objects "
+           "are available, but a stream for those objects exists.";
+    session_->Error(MoqtError::kInternalError,
+                    "Invalid track state provided by application");
+    return nullptr;
+  }
+  return subscription;
+}
+
+void MoqtSession::OutgoingDataStream::SendObjects(
+    PublishedSubscription& subscription) {
+  while (stream_->CanWrite()) {
+    std::optional<PublishedObject> object =
+        subscription.publisher().GetCachedObject(next_object_);
+    if (!object.has_value()) {
+      break;
+    }
+    if (!subscription.InWindow(next_object_)) {
+      // It is possible that the next object became irrelevant due to a
+      // SUBSCRIBE_UPDATE.  Close the stream if so.
+      bool success = stream_->SendFin();
+      QUICHE_BUG_IF(OutgoingDataStream_fin_due_to_update, !success)
+          << "Writing FIN failed despite CanWrite() being true.";
+      return;
+    }
+    SendNextObject(subscription, *std::move(object));
+  }
+}
+
+void MoqtSession::OutgoingDataStream::SendNextObject(
+    PublishedSubscription& subscription, PublishedObject object) {
+  QUICHE_DCHECK(object.sequence == next_object_);
+  QUICHE_DCHECK(stream_->CanWrite());
+
+  MoqtTrackPublisher& publisher = subscription.publisher();
+  QUICHE_DCHECK(DoesTrackStatusImplyHavingData(*publisher.GetTrackStatus()));
+  MoqtForwardingPreference forwarding_preference =
+      publisher.GetForwardingPreference();
+
+  MoqtObject header;
+  header.subscribe_id = subscription_id_;
+  header.track_alias = subscription.track_alias();
+  header.group_id = object.sequence.group;
+  header.object_id = object.sequence.object;
+  header.object_send_order = 0;  // TODO: remove in draft-05
+  header.object_status = object.status;
+  header.forwarding_preference = forwarding_preference;
+  header.payload_length = object.payload.length();
+
+  quiche::QuicheBuffer serialized_header =
+      session_->framer_.SerializeObjectHeader(header, !stream_header_written_);
+  bool fin = false;
+  switch (forwarding_preference) {
+    case MoqtForwardingPreference::kTrack:
+      if (object.status == MoqtObjectStatus::kEndOfGroup ||
+          object.status == MoqtObjectStatus::kGroupDoesNotExist) {
+        ++next_object_.group;
+        next_object_.object = 0;
+      } else {
+        ++next_object_.object;
+      }
+      fin = object.status == MoqtObjectStatus::kEndOfTrack ||
+            !subscription.InWindow(next_object_);
+      break;
+
+    case MoqtForwardingPreference::kGroup:
+      ++next_object_.object;
+      fin = object.status == MoqtObjectStatus::kEndOfTrack ||
+            object.status == MoqtObjectStatus::kEndOfGroup ||
+            object.status == MoqtObjectStatus::kGroupDoesNotExist ||
+            !subscription.InWindow(next_object_);
+      break;
+
+    case MoqtForwardingPreference::kObject:
+      QUICHE_DCHECK(!stream_header_written_);
+      fin = true;
+      break;
+
+    case MoqtForwardingPreference::kDatagram:
+      QUICHE_NOTREACHED();
+      break;
+  }
+
+  // TODO(vasilvv): add a version of WebTransport write API that accepts
+  // memslices so that we can avoid a copy here.
+  std::array<absl::string_view, 2> write_vector = {
+      serialized_header.AsStringView(), object.payload.AsStringView()};
+  quiche::StreamWriteOptions options;
+  options.set_send_fin(fin);
+  absl::Status write_status = stream_->Writev(write_vector, options);
+  if (!write_status.ok()) {
+    QUICHE_BUG(MoqtSession_SendNextObject_write_failed)
+        << "Writing into MoQT stream failed despite CanWrite() being true "
+           "before; status: "
+        << write_status;
+    session_->Error(MoqtError::kInternalError, "Data stream write error");
+    return;
+  }
+
+  QUIC_DVLOG(1) << "Stream " << stream_->GetStreamId() << " successfully wrote "
+                << object.sequence << ", fin = " << fin
+                << ", next: " << next_object_;
+
+  stream_header_written_ = true;
+  subscription.OnObjectSent(object.sequence);
+}
+
+void MoqtSession::PublishedSubscription::SendDatagram(FullSequence sequence) {
+  std::optional<PublishedObject> object =
+      track_publisher_->GetCachedObject(sequence);
+  if (!object.has_value()) {
+    QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache)
+        << "Got notification about an object that is not in the cache";
+    return;
+  }
+
+  MoqtObject header;
+  header.subscribe_id = subscription_id_;
+  header.track_alias = track_alias();
+  header.group_id = object->sequence.group;
+  header.object_id = object->sequence.object;
+  header.object_send_order = 0;  // TODO: remove in draft-05
+  header.object_status = object->status;
+  header.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
+      header, object->payload.AsStringView());
+  session_->session_->SendOrQueueDatagram(datagram.AsStringView());
+  OnObjectSent(object->sequence);
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index b4a0171..52af69d 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -6,9 +6,11 @@
 #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
 
 #include <cstdint>
+#include <memory>
 #include <optional>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
@@ -17,11 +19,13 @@
 #include "quiche/quic/moqt/moqt_framer.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/simple_buffer_allocator.h"
+#include "quiche/common/quiche_circular_deque.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -63,12 +67,7 @@
 class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor {
  public:
   MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters,
-              MoqtSessionCallbacks callbacks = MoqtSessionCallbacks())
-      : session_(session),
-        parameters_(parameters),
-        callbacks_(std::move(callbacks)),
-        framer_(quiche::SimpleBufferAllocator::Get(),
-                parameters.using_webtrans) {}
+              MoqtSessionCallbacks callbacks = MoqtSessionCallbacks());
   ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); }
 
   // webtransport::SessionVisitor implementation.
@@ -79,28 +78,17 @@
   void OnIncomingUnidirectionalStreamAvailable() override;
   void OnDatagramReceived(absl::string_view datagram) override;
   void OnCanCreateNewOutgoingBidirectionalStream() override {}
-  void OnCanCreateNewOutgoingUnidirectionalStream() override {}
+  void OnCanCreateNewOutgoingUnidirectionalStream() override;
 
   void Error(MoqtError code, absl::string_view error);
 
   quic::Perspective perspective() const { return parameters_.perspective; }
 
-  // Add to the list of tracks that can be subscribed to. Call this before
-  // Announce() so that subscriptions can be processed correctly. If |visitor|
-  // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
-  // SUBSCRIBE_OK, but never actually get the objects.
-  void AddLocalTrack(const FullTrackName& full_track_name,
-                     MoqtForwardingPreference forwarding_preference,
-                     LocalTrack::Visitor* visitor);
   // Send an ANNOUNCE message for |track_namespace|, and call
   // |announce_callback| when the response arrives. Will fail immediately if
   // there is already an unresolved ANNOUNCE for that namespace.
   void Announce(absl::string_view track_namespace,
                 MoqtOutgoingAnnounceCallback announce_callback);
-  bool HasSubscribers(const FullTrackName& full_track_name) const;
-  // Send an ANNOUNCE_CANCEL and delete local tracks in that namespace when all
-  // subscriptions are closed for that track.
-  void CancelAnnounce(absl::string_view track_namespace);
 
   // Returns true if SUBSCRIBE was sent. If there is already a subscription to
   // the track, the message will still be sent. However, the visitor will be
@@ -131,21 +119,9 @@
                              RemoteTrack::Visitor* visitor,
                              absl::string_view auth_info = "");
 
-  // Returns false if it could not open a stream when necessary, or if the
-  // track does not exist (there was no call to AddLocalTrack). Will still
-  // return false is some streams succeed.
-  // Also returns false if |payload_length| exists but is shorter than
-  // |payload|.
-  // |payload.length() >= |payload_length|, because the application can deliver
-  // partial objects.
-  bool PublishObject(const FullTrackName& full_track_name, uint64_t group_id,
-                     uint64_t object_id, uint64_t object_send_order,
-                     MoqtObjectStatus status, absl::string_view payload);
-  void CloseObjectStream(const FullTrackName& full_track_name,
-                         uint64_t group_id);
-  // TODO: Add an API to send partial objects.
-
   MoqtSessionCallbacks& callbacks() { return callbacks_; }
+  MoqtPublisher* publisher() { return publisher_; }
+  void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; }
 
  private:
   friend class test::MoqtSessionPeer;
@@ -298,10 +274,70 @@
     MoqtParser parser_;
     std::string partial_object_;
   };
+  // Represents a record for a single subscription to a local track that is
+  // being sent to the peer.
+  class PublishedSubscription : public MoqtObjectListener {
+   public:
+    explicit PublishedSubscription(
+        MoqtSession* session,
+        std::shared_ptr<MoqtTrackPublisher> track_publisher,
+        const MoqtSubscribe& subscribe);
+    ~PublishedSubscription();
+
+    PublishedSubscription(const PublishedSubscription&) = delete;
+    PublishedSubscription(PublishedSubscription&&) = delete;
+    PublishedSubscription& operator=(const PublishedSubscription&) = delete;
+    PublishedSubscription& operator=(PublishedSubscription&&) = delete;
+
+    MoqtTrackPublisher& publisher() { return *track_publisher_; }
+    uint64_t track_alias() const { return track_alias_; }
+    std::optional<FullSequence> largest_sent() const { return largest_sent_; }
+
+    void OnNewObjectAvailable(FullSequence sequence) override;
+
+    // Creates streams for all objects that are currently in the track's object
+    // cache and match the subscription window.  This is in some sense similar
+    // to a fetch (since all of the objects are in the past), but is
+    // conceptually simpler, as backpressure is less of a concern.
+    void Backfill();
+
+    // Updates the window of the subscription in question.
+    void Update(FullSequence start, std::optional<FullSequence> end);
+    // Checks if the specified sequence is within the window of this
+    // subscription.
+    bool InWindow(FullSequence sequence) { return window_.InWindow(sequence); }
+
+    void OnDataStreamCreated(webtransport::StreamId id,
+                             FullSequence start_sequence);
+    void OnDataStreamDestroyed(webtransport::StreamId id,
+                               FullSequence end_sequence);
+    void OnObjectSent(FullSequence sequence);
+
+    std::vector<webtransport::StreamId> GetAllStreams() const;
+
+   private:
+    SendStreamMap& stream_map();
+    quic::Perspective perspective() const {
+      return session_->parameters_.perspective;
+    }
+
+    void SendDatagram(FullSequence sequence);
+
+    uint64_t subscription_id_;
+    MoqtSession* session_;
+    std::shared_ptr<MoqtTrackPublisher> track_publisher_;
+    uint64_t track_alias_;
+    SubscribeWindow window_;
+    // Largest sequence number ever sent via this subscription.
+    std::optional<FullSequence> largest_sent_;
+    // Should be almost always accessed via `stream_map()`.
+    std::optional<SendStreamMap> lazily_initialized_stream_map_;
+  };
   class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
    public:
-    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream)
-        : session_(session), stream_(stream) {}
+    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
+                       uint64_t subscription_id, FullSequence first_object);
+    ~OutgoingDataStream();
 
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override {}
@@ -312,11 +348,37 @@
 
     webtransport::Stream* stream() const { return stream_; }
 
+    // Sends objects on the stream, starting with `next_object_`, until the
+    // stream becomes write-blocked or closed.
+    void SendObjects(PublishedSubscription& subscription);
+
    private:
     friend class test::MoqtSessionPeer;
 
+    // Checks whether the associated subscription is still valid; if not, resets
+    // the stream and returns nullptr.
+    PublishedSubscription* GetSubscriptionIfValid();
+
+    // Actually sends an object on the stream; the object MUST be
+    // `next_object_`.
+    void SendNextObject(PublishedSubscription& subscription,
+                        PublishedObject object);
+
     MoqtSession* session_;
     webtransport::Stream* stream_;
+    uint64_t subscription_id_;
+    FullSequence next_object_;
+    bool stream_header_written_ = false;
+    // A weak pointer to an object owned by the session.  Used to make sure the
+    // session does not get called after being destroyed.
+    std::weak_ptr<void> session_liveness_;
+  };
+  // QueuedOutgoingDataStream records an information necessary to create a
+  // stream that was attempted to be created before but was blocked due to flow
+  // control.
+  struct QueuedOutgoingDataStream {
+    uint64_t subscription_id;
+    FullSequence first_object;
   };
 
   // Returns true if SUBSCRIBE_DONE was sent.
@@ -331,9 +393,14 @@
   // Returns false if the SUBSCRIBE isn't sent.
   bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
 
-  // Returns the stream ID if successful, nullopt if not.
-  // TODO: Add a callback if stream creation is delayed.
-  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
+  // Opens a new data stream, or queues it if the session is flow control
+  // blocked.
+  webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id,
+                                              FullSequence first_object);
+  // Same as above, except the session is required to be not flow control
+  // blocked.
+  webtransport::Stream* OpenDataStream(uint64_t subscription_id,
+                                       FullSequence first_object);
 
   // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
   // nullptr if not present.
@@ -356,9 +423,16 @@
   absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
   uint64_t next_remote_track_alias_ = 0;
 
-  // All the tracks the peer can subscribe to.
-  absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
-  absl::flat_hash_map<uint64_t, FullTrackName> local_track_by_subscribe_id_;
+  // Application object representing the publisher for all of the tracks that
+  // can be subscribed to via this connection.  Must outlive this object.
+  MoqtPublisher* publisher_;
+  // Subscriptions for local tracks by the remote peer, indexed by subscribe ID.
+  absl::flat_hash_map<uint64_t, std::unique_ptr<PublishedSubscription>>
+      published_subscriptions_;
+  // Keeps track of all the data streams that were supposed to be open, but were
+  // blocked by the flow control.
+  quiche::QuicheCircularDeque<QueuedOutgoingDataStream>
+      queued_outgoing_data_streams_;
   // This is only used to check for track_alias collisions.
   absl::flat_hash_set<uint64_t> used_track_aliases_;
   uint64_t next_local_track_alias_ = 0;
@@ -386,6 +460,11 @@
   // an uninitialized value if no SETUP arrives or it arrives with no Role
   // parameter, and other checks have changed/been disabled.
   MoqtRole peer_role_ = MoqtRole::kPubSub;
+
+  // Must be last.  Token used to make sure that the streams do not call into
+  // the session when the session has already been destroyed.
+  struct Empty {};
+  std::shared_ptr<Empty> liveness_token_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index f8e64e8..1c8162b 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -12,16 +12,20 @@
 #include <utility>
 
 #include "absl/status/status.h"
+#include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_data_reader.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
 #include "quiche/common/quiche_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
@@ -32,6 +36,7 @@
 
 namespace {
 
+using ::quic::test::MemSliceFromString;
 using ::testing::_;
 using ::testing::AnyNumber;
 using ::testing::Return;
@@ -60,6 +65,19 @@
   return static_cast<MoqtMessageType>(value);
 }
 
+static std::shared_ptr<MockTrackPublisher> SetupPublisher(
+    FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
+    FullSequence largest_sequence) {
+  auto publisher = std::make_shared<MockTrackPublisher>(std::move(track_name));
+  ON_CALL(*publisher, GetTrackStatus())
+      .WillByDefault(Return(MoqtTrackStatusCode::kInProgress));
+  ON_CALL(*publisher, GetForwardingPreference())
+      .WillByDefault(Return(forwarding_preference));
+  ON_CALL(*publisher, GetLargestSequence())
+      .WillByDefault(Return(largest_sequence));
+  return publisher;
+}
+
 }  // namespace
 
 class MoqtSessionPeer {
@@ -108,30 +126,21 @@
     session->active_subscribes_[subscribe_id] = {subscribe, visitor};
   }
 
-  static LocalTrack* local_track(MoqtSession* session, FullTrackName& name) {
-    auto it = session->local_tracks_.find(name);
-    if (it == session->local_tracks_.end()) {
-      return nullptr;
-    }
-    return &it->second;
-  }
-
-  static void AddSubscription(MoqtSession* session, FullTrackName& name,
-                              uint64_t subscribe_id, uint64_t track_alias,
-                              uint64_t start_group, uint64_t start_object) {
-    LocalTrack* track = local_track(session, name);
-    track->set_track_alias(track_alias);
-    track->AddWindow(subscribe_id, start_group, start_object);
-    session->used_track_aliases_.emplace(track_alias);
-    session->local_track_by_subscribe_id_.emplace(subscribe_id,
-                                                  track->full_track_name());
-  }
-
-  static FullSequence next_sequence(MoqtSession* session, FullTrackName& name) {
-    auto it = session->local_tracks_.find(name);
-    EXPECT_NE(it, session->local_tracks_.end());
-    LocalTrack& track = it->second;
-    return track.next_sequence();
+  static MoqtObjectListener* AddSubscription(
+      MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> publisher,
+      uint64_t subscribe_id, uint64_t track_alias, uint64_t start_group,
+      uint64_t start_object) {
+    MoqtSubscribe subscribe;
+    subscribe.track_namespace = publisher->GetTrackName().track_namespace;
+    subscribe.track_name = publisher->GetTrackName().track_name;
+    subscribe.track_alias = track_alias;
+    subscribe.subscribe_id = subscribe_id;
+    subscribe.start_group = start_group;
+    subscribe.start_object = start_object;
+    session->published_subscriptions_.emplace(
+        subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
+                          session, std::move(publisher), subscribe));
+    return session->published_subscriptions_[subscribe_id].get();
   }
 
   static void set_peer_role(MoqtSession* session, MoqtRole role) {
@@ -147,7 +156,9 @@
  public:
   MoqtSessionTest()
       : session_(&mock_session_, default_parameters,
-                 session_callbacks_.AsSessionCallbacks()) {}
+                 session_callbacks_.AsSessionCallbacks()) {
+    session_.set_publisher(&publisher_);
+  }
   ~MoqtSessionTest() {
     EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
   }
@@ -155,6 +166,7 @@
   MockSessionCallbacks session_callbacks_;
   StrictMock<webtransport::test::MockSession> mock_session_;
   MoqtSession session_;
+  MoqtKnownTrackPublisher publisher_;
 };
 
 TEST_F(MoqtSessionTest, Queries) {
@@ -314,10 +326,11 @@
   EXPECT_TRUE(correct_message);
 
   // Add the track. Now Subscribe should succeed.
-  MockLocalTrackVisitor local_track_visitor;
-  session_.AddLocalTrack(FullTrackName("foo", "bar"),
-                         MoqtForwardingPreference::kObject,
-                         &local_track_visitor);
+  auto track_publisher =
+      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
+  EXPECT_CALL(*track_publisher, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kStatusNotAvailable));
+  publisher_.Add(track_publisher);
   correct_message = true;
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
@@ -404,50 +417,20 @@
   EXPECT_TRUE(correct_message);
 }
 
-TEST_F(MoqtSessionTest, HasSubscribers) {
-  MockLocalTrackVisitor local_track_visitor;
-  FullTrackName ftn("foo", "bar");
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
-                         &local_track_visitor);
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
-
-  // Peer subscribes.
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/0,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  bool correct_message = true;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
-        return absl::OkStatus();
-      });
-  stream_input->OnSubscribeMessage(request);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(session_.HasSubscribers(ftn));
-}
-
 TEST_F(MoqtSessionTest, SubscribeForPast) {
-  MockLocalTrackVisitor local_track_visitor;
   FullTrackName ftn("foo", "bar");
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &local_track_visitor);
+  auto track = std::make_shared<MockTrackPublisher>(ftn);
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
+  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
+      .WillRepeatedly(Return(std::vector<FullSequence>()));
+  EXPECT_CALL(*track, GetLargestSequence())
+      .WillRepeatedly(Return(FullSequence(10, 20)));
+  publisher_.Add(track);
 
-  // Send Sequence (2, 0) so that next_sequence is set correctly.
-  session_.PublishObject(ftn, 2, 0, 0, MoqtObjectStatus::kNormal, "foo");
   // Peer subscribes to (0, 0)
   MoqtSubscribe request = {
       /*subscribe_id=*/1,
@@ -464,8 +447,6 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
   bool correct_message = true;
-  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_)).WillOnce(Return([] {
-  }));
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
@@ -882,114 +863,141 @@
 }
 
 TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kObject,
+                              FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
-  // No subscription; this is a no-op except to update next_sequence.
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
-  session_.PublishObject(ftn, 4, 1, 0, MoqtObjectStatus::kNormal, "deadbeef");
-  EXPECT_EQ(MoqtSessionPeer::next_sequence(&session_, ftn), FullSequence(4, 2));
-
-  // Publish in window.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
+  bool fin = false;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+    return stream_visitor.get();
+  });
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
-  // Send on the stream
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(&mock_stream));
-  bool correct_message = false;
+      .WillRepeatedly(Return(&mock_stream));
+
   // Verify first six message fields are sent correctly
-  uint8_t kExpectedMessage[] = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+  bool correct_message = false;
+  const std::string kExpectedMessage = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
-        correct_message = (0 == memcmp(data.data()->data(), kExpectedMessage,
-                                       sizeof(kExpectedMessage)));
+        correct_message = absl::StartsWith(data[0], kExpectedMessage);
+        fin |= options.send_fin();
         return absl::OkStatus();
       });
-  session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal, "deadbeef");
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
   EXPECT_TRUE(correct_message);
 }
 
 // TODO: Test operation with multiple streams.
 
-// Error cases
-
-TEST_F(MoqtSessionTest, CannotOpenUniStream) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
+TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  ;
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
+
+  // Queue the outgoing stream.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(false));
-  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal,
-                                      "deadbeef"));
-}
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
 
-TEST_F(MoqtSessionTest, GetStreamByIdFails) {
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kObject,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  // Unblock the session, and cause the queued stream to be sent.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
+  bool fin = false;
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+    return stream_visitor.get();
+  });
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(nullptr));
-  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal,
-                                      "deadbeef"));
+      .WillRepeatedly(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+    return std::optional<PublishedObject>();
+  });
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
 }
 
-TEST_F(MoqtSessionTest, SubscribeProposesBadTrackAlias) {
-  MockLocalTrackVisitor local_track_visitor;
+TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
   FullTrackName ftn("foo", "bar");
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kGroup,
-                         &local_track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
-  // Peer subscribes.
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/3,  // Doesn't match 2.
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/0,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
+  // Set up an outgoing stream for a group.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true));
   StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  bool correct_message = true;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeError);
-        return absl::OkStatus();
+  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
+  EXPECT_CALL(mock_stream, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor = std::move(visitor);
       });
-  stream_input->OnSubscribeMessage(request);
-  EXPECT_TRUE(correct_message);
+  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+    return stream_visitor.get();
+  });
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kOutgoingUniStreamId));
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillRepeatedly(Return(&mock_stream));
+
+  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
+    return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal,
+                           MemSliceFromString("deadbeef")};
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillOnce([] {
+    return std::optional<PublishedObject>();
+  });
+  subscription->OnNewObjectAvailable(FullSequence(5, 0));
+
+  // Now that the stream exists and is recorded within subscription, make it
+  // disappear by returning nullptr.
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillRepeatedly(Return(nullptr));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).Times(0);
+  subscription->OnNewObjectAvailable(FullSequence(5, 1));
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -1081,10 +1089,9 @@
 
 TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 1, 3, 4);
-  EXPECT_TRUE(session_.HasSubscribers(ftn));
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kTrack, FullSequence(4, 2));
+  MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
@@ -1103,15 +1110,14 @@
       });
   stream_input->OnUnsubscribeMessage(unsubscribe);
   EXPECT_TRUE(correct_message);
-  EXPECT_FALSE(session_.HasSubscribers(ftn));
 }
 
 TEST_F(MoqtSessionTest, SendDatagram) {
   FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kDatagram,
-                         &track_visitor);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  std::shared_ptr<MockTrackPublisher> track_publisher = SetupPublisher(
+      ftn, MoqtForwardingPreference::kDatagram, FullSequence{4, 0});
+  MoqtObjectListener* listener =
+      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 0, 2, 5, 0);
 
   // Publish in window.
   bool correct_message = false;
@@ -1128,7 +1134,12 @@
         return webtransport::DatagramStatus(
             webtransport::DatagramStatusCode::kSuccess, "");
       });
-  session_.PublishObject(ftn, 5, 0, 0, MoqtObjectStatus::kNormal, "deadbeef");
+  EXPECT_CALL(*track_publisher, GetCachedObject(FullSequence{5, 0}))
+      .WillRepeatedly([] {
+        return PublishedObject{FullSequence{5, 0}, MoqtObjectStatus::kNormal,
+                               MemSliceFromString("deadbeef")};
+      });
+  listener->OnNewObjectAvailable(FullSequence(5, 0));
   EXPECT_TRUE(correct_message);
 }
 
@@ -1240,6 +1251,8 @@
   stream_input->OnAnnounceMessage(announce);
 }
 
+// TODO: re-enable this test once this behavior is re-implemented.
+#if 0
 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {
   MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
   FullTrackName ftn("foo", "bar");
@@ -1275,110 +1288,7 @@
   EXPECT_TRUE(correct_message);
   EXPECT_FALSE(session_.HasSubscribers(ftn));
 }
-
-TEST_F(MoqtSessionTest, ProcessAnnounceCancelNoSubscribes) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  EXPECT_EQ(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-}
-
-TEST_F(MoqtSessionTest, ProcessAnnounceCancelActiveSubscribes) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 1, 2, 7, 0);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  // The track is still there because there is a subscribe.
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  // Unsubscribe from 0.
-  MoqtUnsubscribe unsubscribe = {
-      /*subscribe_id=*/0,
-  };
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  bool correct_message = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeDone);
-        return absl::OkStatus();
-      });
-  stream_input->OnUnsubscribeMessage(unsubscribe);
-  EXPECT_TRUE(correct_message);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  // Unsubscribe from 1.
-  unsubscribe.subscribe_id = 1;
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  correct_message = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
-      .WillOnce([&](absl::Span<const absl::string_view> data,
-                    const quiche::StreamWriteOptions& options) {
-        correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeDone);
-        return absl::OkStatus();
-      });
-  stream_input->OnUnsubscribeMessage(unsubscribe);
-  EXPECT_TRUE(correct_message);
-
-  EXPECT_EQ(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-}
-
-TEST_F(MoqtSessionTest, AnnounceCancelThenSubscribe) {
-  MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kSubscriber);
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, MoqtForwardingPreference::kTrack, &track_visitor);
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  MoqtAnnounceCancel cancel = {
-      /*track_namespace=*/"foo",
-  };
-  std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  stream_input->OnAnnounceCancelMessage(cancel);
-  // The track is still there because there is a subscribe.
-  EXPECT_NE(MoqtSessionPeer::local_track(&session_, ftn), nullptr);
-  MoqtSubscribe subscribe = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*start_group=*/4,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*end_object=*/std::nullopt,
-      /*authorization_info=*/std::nullopt,
-  };
-  EXPECT_CALL(mock_session_,
-              CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
-                           "Received SUBSCRIBE for canceled track"))
-      .Times(1);
-  stream_input->OnSubscribeMessage(subscribe);
-}
-
-// TODO: Cover more error cases in the above
+#endif
 
 }  // namespace test
 
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 21f16e5..422a3fc 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -4,12 +4,12 @@
 
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 
-#include <cstdint>
 #include <optional>
 #include <vector>
 
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -21,9 +21,9 @@
   return (!end_.has_value() || seq <= *end_);
 }
 
-std::optional<webtransport::StreamId> SubscribeWindow::GetStreamForSequence(
+std::optional<webtransport::StreamId> SendStreamMap::GetStreamForSequence(
     FullSequence sequence) const {
-  FullSequence index = SequenceToIndex(sequence);
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
   auto stream_it = send_streams_.find(index);
   if (stream_it == send_streams_.end()) {
     return std::nullopt;
@@ -31,59 +31,27 @@
   return stream_it->second;
 }
 
-void SubscribeWindow::AddStream(uint64_t group_id, uint64_t object_id,
-                                webtransport::StreamId stream_id) {
-  if (!InWindow(FullSequence(group_id, object_id))) {
-    return;
-  }
-  FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+void SendStreamMap::AddStream(FullSequence sequence,
+                              webtransport::StreamId stream_id) {
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
   if (forwarding_preference_ == MoqtForwardingPreference::kDatagram) {
     QUIC_BUG(quic_bug_moqt_draft_03_01) << "Adding a stream for datagram";
     return;
   }
-  auto stream_it = send_streams_.find(index);
-  if (stream_it != send_streams_.end()) {
-    QUIC_BUG(quic_bug_moqt_draft_03_02) << "Stream already added";
-    return;
-  }
-  send_streams_[index] = stream_id;
+  auto [stream_it, success] = send_streams_.emplace(index, stream_id);
+  QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added";
 }
 
-void SubscribeWindow::RemoveStream(uint64_t group_id, uint64_t object_id) {
-  FullSequence index = SequenceToIndex(FullSequence(group_id, object_id));
+void SendStreamMap::RemoveStream(FullSequence sequence,
+                                 webtransport::StreamId stream_id) {
+  ReducedSequenceIndex index(sequence, forwarding_preference_);
+  QUICHE_DCHECK(send_streams_.contains(index) &&
+                send_streams_.find(index)->second == stream_id)
+      << "Requested to remove a stream ID that does not match the one in the "
+         "map";
   send_streams_.erase(index);
 }
 
-bool SubscribeWindow::OnObjectSent(FullSequence sequence,
-                                   MoqtObjectStatus status) {
-  if (!largest_delivered_.has_value() || *largest_delivered_ < sequence) {
-    largest_delivered_ = sequence;
-  }
-  // Update next_to_backfill_
-  if (sequence < original_next_object_ && next_to_backfill_.has_value() &&
-      *next_to_backfill_ <= sequence) {
-    switch (status) {
-      case MoqtObjectStatus::kNormal:
-      case MoqtObjectStatus::kObjectDoesNotExist:
-        next_to_backfill_ = sequence.next();
-        break;
-      case MoqtObjectStatus::kEndOfGroup:
-        next_to_backfill_ = FullSequence(sequence.group + 1, 0);
-        break;
-      default:  // Includes kEndOfTrack.
-        next_to_backfill_ = std::nullopt;
-        break;
-    }
-    if (next_to_backfill_ == original_next_object_ ||
-        *next_to_backfill_ == end_) {
-      // Redelivery is complete.
-      next_to_backfill_ = std::nullopt;
-    }
-  }
-  return (!next_to_backfill_.has_value() && end_.has_value() &&
-          *end_ <= sequence);
-}
-
 bool SubscribeWindow::UpdateStartEnd(FullSequence start,
                                      std::optional<FullSequence> end) {
   // Can't make the subscription window bigger.
@@ -98,29 +66,37 @@
   return true;
 }
 
-FullSequence SubscribeWindow::SequenceToIndex(FullSequence sequence) const {
-  switch (forwarding_preference_) {
+bool SubscribeWindow::IsStreamProvokingObject(
+    FullSequence sequence, MoqtForwardingPreference preference) const {
+  if (preference == MoqtForwardingPreference::kGroup) {
+    return sequence.object == 0 || sequence == start_;
+  }
+  QUICHE_DCHECK(preference != MoqtForwardingPreference::kDatagram);
+  return true;
+}
+
+ReducedSequenceIndex::ReducedSequenceIndex(
+    FullSequence sequence, MoqtForwardingPreference preference) {
+  switch (preference) {
     case MoqtForwardingPreference::kTrack:
-      return FullSequence(0, 0);
+      sequence_ = FullSequence(0, 0);
+      break;
     case MoqtForwardingPreference::kGroup:
-      return FullSequence(sequence.group, 0);
+      sequence_ = FullSequence(sequence.group, 0);
+      break;
     case MoqtForwardingPreference::kObject:
-      return sequence;
     case MoqtForwardingPreference::kDatagram:
-      QUIC_BUG(quic_bug_moqt_draft_03_01) << "No stream for datagram";
-      return FullSequence(0, 0);
+      sequence_ = sequence;
+      break;
   }
 }
 
-std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
-    FullSequence sequence) {
-  std::vector<SubscribeWindow*> retval;
-  for (auto& [subscribe_id, window] : windows_) {
-    if (window.InWindow(sequence)) {
-      retval.push_back(&(window));
-    }
+std::vector<webtransport::StreamId> SendStreamMap::GetAllStreams() const {
+  std::vector<webtransport::StreamId> ids;
+  for (const auto& [index, id] : send_streams_) {
+    ids.push_back(id);
   }
-  return retval;
+  return ids;
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index d393284..5e0307d 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -10,147 +10,88 @@
 #include <vector>
 
 #include "absl/container/flat_hash_map.h"
-#include "absl/container/node_hash_map.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
-// Classes to track subscriptions to local tracks: the sequence numbers
-// subscribed, the streams involved, and the subscribe IDs.
+// SubscribeWindow represents a window of objects for which an MoQT subscription
+// can be valid.
 class QUICHE_EXPORT SubscribeWindow {
  public:
   // Creates a half-open window. |next_object| is the expected sequence number
   // of the next published object on the track.
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, uint64_t start_group,
-                  uint64_t start_object)
-      : SubscribeWindow(subscribe_id, forwarding_preference, next_object,
-                        FullSequence(start_group, start_object), std::nullopt) {
+  SubscribeWindow(uint64_t start_group, uint64_t start_object)
+      : SubscribeWindow(FullSequence(start_group, start_object), std::nullopt) {
   }
 
   // Creates a closed window.
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, uint64_t start_group,
-                  uint64_t start_object, uint64_t end_group,
-                  uint64_t end_object)
-      : SubscribeWindow(subscribe_id, forwarding_preference, next_object,
-                        FullSequence(start_group, start_object),
+  SubscribeWindow(uint64_t start_group, uint64_t start_object,
+                  uint64_t end_group, uint64_t end_object)
+      : SubscribeWindow(FullSequence(start_group, start_object),
                         FullSequence(end_group, end_object)) {}
 
-  SubscribeWindow(uint64_t subscribe_id,
-                  MoqtForwardingPreference forwarding_preference,
-                  FullSequence next_object, FullSequence start,
-                  std::optional<FullSequence> end)
-      : subscribe_id_(subscribe_id),
-        start_(start),
-        end_(end),
-        original_next_object_(next_object),
-        forwarding_preference_(forwarding_preference) {
-    next_to_backfill_ =
-        (start < next_object) ? start : std::optional<FullSequence>();
-  }
-
-  uint64_t subscribe_id() const { return subscribe_id_; }
+  SubscribeWindow(FullSequence start, std::optional<FullSequence> end)
+      : start_(start), end_(end) {}
 
   bool InWindow(const FullSequence& seq) const;
-
-  // Returns the stream to send |sequence| on, if already opened.
-  std::optional<webtransport::StreamId> GetStreamForSequence(
-      FullSequence sequence) const;
-
-  // Records what stream is being used for a track, group, or object depending
-  // on |forwarding_preference|. Triggers QUIC_BUG if already assigned.
-  void AddStream(uint64_t group_id, uint64_t object_id,
-                 webtransport::StreamId stream_id);
-
-  void RemoveStream(uint64_t group_id, uint64_t object_id);
-
   bool HasEnd() const { return end_.has_value(); }
-  MoqtForwardingPreference forwarding_preference() const {
-    return forwarding_preference_;
-  }
+  FullSequence start() const { return start_; }
 
-  // Returns true if the object delivery completed the subscription
-  bool OnObjectSent(FullSequence sequence, MoqtObjectStatus status);
-
-  std::optional<FullSequence>& largest_delivered() {
-    return largest_delivered_;
-  }
-
-  // Returns true if the updated values are valid.
+  // Updates the subscription window. Returns true if the update is valid (in
+  // MoQT, subscription windows are only allowed to shrink, not to expand).
   bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end);
 
- private:
-  // Converts an object sequence number into one that matches the way that
-  // stream IDs are being mapped. (See the comment for send_streams_ below.)
-  FullSequence SequenceToIndex(FullSequence sequence) const;
+  // Returns true if for a given forwarding preference, the specified sequence
+  // number might be the first object on a stream.
+  bool IsStreamProvokingObject(FullSequence sequence,
+                               MoqtForwardingPreference preference) const;
 
-  const uint64_t subscribe_id_;
+ private:
   FullSequence start_;
   std::optional<FullSequence> end_;
-  std::optional<FullSequence> largest_delivered_;
-  // The next sequence number to be redelivered, because it was published prior
-  // to the subscription. Is nullopt if no redeliveries are needed.
-  std::optional<FullSequence> next_to_backfill_;
-  // The first unpublished sequence number when the subscribe arrived.
-  const FullSequence original_next_object_;
-  // Store open streams for this subscription. If the forwarding preference is
-  // kTrack, there is one entry under sequence (0, 0). If kGroup, each entry is
-  // under (group, 0). If kObject, it's tracked under the full sequence. If
-  // kDatagram, the map is empty.
-  absl::flat_hash_map<FullSequence, webtransport::StreamId> send_streams_;
-  // The forwarding preference for this track; informs how the streams are
-  // mapped.
-  const MoqtForwardingPreference forwarding_preference_;
 };
 
-// Class to keep track of the sequence number blocks to which a peer is
-// subscribed.
-class QUICHE_EXPORT MoqtSubscribeWindows {
+// ReducedSequenceIndex represents an index object such that if two sequence
+// numbers are mapped to the same stream, they will be mapped to the same index.
+class ReducedSequenceIndex {
  public:
-  MoqtSubscribeWindows(MoqtForwardingPreference forwarding_preference)
-      : forwarding_preference_(forwarding_preference) {}
+  ReducedSequenceIndex(FullSequence sequence,
+                       MoqtForwardingPreference preference);
 
-  // Returns a vector of subscribe IDs that apply to the object. They will be in
-  // reverse order of the AddWindow calls.
-  std::vector<SubscribeWindow*> SequenceIsSubscribed(FullSequence sequence);
-
-  // |start_group| and |start_object| must be absolute sequence numbers. An
-  // optimization could consolidate overlapping subscribe windows.
-  void AddWindow(uint64_t subscribe_id, FullSequence next_object,
-                 uint64_t start_group, uint64_t start_object) {
-    windows_.emplace(subscribe_id,
-                     SubscribeWindow(subscribe_id, forwarding_preference_,
-                                     next_object, start_group, start_object));
+  bool operator==(const ReducedSequenceIndex& other) const {
+    return sequence_ == other.sequence_;
   }
-  void AddWindow(uint64_t subscribe_id, FullSequence next_object,
-                 uint64_t start_group, uint64_t start_object,
-                 uint64_t end_group, uint64_t end_object) {
-    windows_.emplace(
-        subscribe_id,
-        SubscribeWindow(subscribe_id, forwarding_preference_, next_object,
-                        start_group, start_object, end_group, end_object));
+  bool operator!=(const ReducedSequenceIndex& other) const {
+    return sequence_ != other.sequence_;
   }
-  void RemoveWindow(uint64_t subscribe_id) { windows_.erase(subscribe_id); }
 
-  bool IsEmpty() const { return windows_.empty(); }
-
-  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
-    auto it = windows_.find(subscribe_id);
-    if (it == windows_.end()) {
-      return nullptr;
-    }
-    return &it->second;
+  template <typename H>
+  friend H AbslHashValue(H h, const ReducedSequenceIndex& m) {
+    return H::combine(std::move(h), m.sequence_);
   }
 
  private:
-  // Indexed by Subscribe ID.
-  absl::node_hash_map<uint64_t, SubscribeWindow> windows_;
-  const MoqtForwardingPreference forwarding_preference_;
+  FullSequence sequence_;
+};
+
+// A map of outgoing data streams indexed by object sequence numbers.
+class QUICHE_EXPORT SendStreamMap {
+ public:
+  explicit SendStreamMap(MoqtForwardingPreference forwarding_preference)
+      : forwarding_preference_(forwarding_preference) {}
+
+  std::optional<webtransport::StreamId> GetStreamForSequence(
+      FullSequence sequence) const;
+  void AddStream(FullSequence sequence, webtransport::StreamId stream_id);
+  void RemoveStream(FullSequence sequence, webtransport::StreamId stream_id);
+  std::vector<webtransport::StreamId> GetAllStreams() const;
+
+ private:
+  absl::flat_hash_map<ReducedSequenceIndex, webtransport::StreamId>
+      send_streams_;
+  MoqtForwardingPreference forwarding_preference_;
 };
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 0aae7c7..49213ff 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -21,15 +21,12 @@
   SubscribeWindowTest() {}
 
   const uint64_t subscribe_id_ = 2;
-  const FullSequence right_edge_{4, 5};
   const FullSequence start_{4, 0};
   const FullSequence end_{5, 5};
 };
 
 TEST_F(SubscribeWindowTest, Queries) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  EXPECT_EQ(window.subscribe_id(), 2);
+  SubscribeWindow window(start_, end_);
   EXPECT_TRUE(window.InWindow(FullSequence(4, 0)));
   EXPECT_TRUE(window.InWindow(FullSequence(5, 5)));
   EXPECT_FALSE(window.InWindow(FullSequence(5, 6)));
@@ -38,98 +35,51 @@
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdTrack) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kTrack,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  EXPECT_QUIC_BUG(window.AddStream(5, 2, 6), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 2)), 2);
-  window.RemoveStream(7, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 0)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kTrack);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{5, 2}, 6),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), 2);
+  stream_map.RemoveStream(FullSequence{7, 2}, 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kGroup,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
-  window.AddStream(5, 2, 6);
-  EXPECT_QUIC_BUG(window.AddStream(5, 3, 6), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 1)), 2);
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(5, 0)), 6);
-  window.RemoveStream(5, 1);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 2)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kGroup);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
+  stream_map.AddStream(FullSequence{5, 2}, 6);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{5, 3}, 6),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 1)), 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), 6);
+  stream_map.RemoveStream(FullSequence{5, 1}, 6);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  window.AddStream(4, 0, 2);
-  window.AddStream(4, 1, 6);
-  window.AddStream(4, 2, 10);
-  EXPECT_QUIC_BUG(window.AddStream(4, 2, 14), "Stream already added");
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 0)), 2);
-  EXPECT_EQ(*window.GetStreamForSequence(FullSequence(4, 2)), 10);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 4)).has_value());
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(5, 0)).has_value());
-  window.RemoveStream(4, 2);
-  EXPECT_FALSE(window.GetStreamForSequence(FullSequence(4, 2)).has_value());
+  SendStreamMap stream_map(MoqtForwardingPreference::kObject);
+  stream_map.AddStream(FullSequence{4, 0}, 2);
+  stream_map.AddStream(FullSequence{4, 1}, 6);
+  stream_map.AddStream(FullSequence{4, 2}, 10);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 2}, 14),
+                  "Stream already added");
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), 2);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), 10);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 4)), std::nullopt);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
+  stream_map.RemoveStream(FullSequence(4, 2), 10);
+  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), std::nullopt);
 }
 
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kDatagram,
-                         right_edge_, start_, end_);
-  EXPECT_QUIC_BUG(window.AddStream(4, 0, 2), "Adding a stream for datagram");
-}
-
-TEST_F(SubscribeWindowTest, OnObjectSent) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
-  EXPECT_FALSE(window.largest_delivered().has_value());
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 1), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(window.largest_delivered().has_value());
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 2), MoqtObjectStatus::kNormal));
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(4, 0), MoqtObjectStatus::kNormal));
-  EXPECT_EQ(window.largest_delivered().value(), FullSequence(4, 2));
-}
-
-TEST_F(SubscribeWindowTest, AllObjectsUnpublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(0, 0), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
-}
-
-TEST_F(SubscribeWindowTest, AllObjectsPublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(4, 0), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
-}
-
-TEST_F(SubscribeWindowTest, SomeObjectsUnpublishedAtStart) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         FullSequence(0, 1), FullSequence(0, 0),
-                         FullSequence(0, 1));
-  EXPECT_FALSE(
-      window.OnObjectSent(FullSequence(0, 0), MoqtObjectStatus::kNormal));
-  EXPECT_TRUE(
-      window.OnObjectSent(FullSequence(0, 1), MoqtObjectStatus::kNormal));
+  SendStreamMap stream_map(MoqtForwardingPreference::kDatagram);
+  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 0}, 2),
+                  "Adding a stream for datagram");
 }
 
 TEST_F(SubscribeWindowTest, UpdateStartEnd) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, end_);
+  SubscribeWindow window(start_, end_);
   EXPECT_TRUE(window.UpdateStartEnd(start_.next(),
                                     FullSequence(end_.group, end_.object - 1)));
   EXPECT_FALSE(window.InWindow(FullSequence(start_.group, start_.object)));
@@ -140,55 +90,12 @@
 }
 
 TEST_F(SubscribeWindowTest, UpdateStartEndOpenEnded) {
-  SubscribeWindow window(subscribe_id_, MoqtForwardingPreference::kObject,
-                         right_edge_, start_, std::nullopt);
+  SubscribeWindow window(start_, std::nullopt);
   EXPECT_TRUE(window.UpdateStartEnd(start_, end_));
   EXPECT_FALSE(window.InWindow(end_.next()));
   EXPECT_FALSE(window.UpdateStartEnd(start_, std::nullopt));
 }
 
-class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest {
- public:
-  MoqtSubscribeWindowsTest() : windows_(MoqtForwardingPreference::kObject) {}
-  MoqtSubscribeWindows windows_;
-};
-
-TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
-  EXPECT_TRUE(windows_.IsEmpty());
-  windows_.AddWindow(0, FullSequence(2, 1), 1, 3);
-  EXPECT_FALSE(windows_.IsEmpty());
-}
-
-TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
-  EXPECT_TRUE(windows_.IsEmpty());
-  // The first two windows overlap; the third is open-ended.
-  windows_.AddWindow(0, FullSequence(0, 0), 1, 0, 3, 9);
-  windows_.AddWindow(1, FullSequence(0, 0), 2, 4, 4, 3);
-  windows_.AddWindow(2, FullSequence(0, 0), 10, 0);
-  EXPECT_FALSE(windows_.IsEmpty());
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty());
-  auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0));
-  EXPECT_EQ(hits.size(), 1);
-  EXPECT_EQ(hits[0]->subscribe_id(), 0);
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(4, 4)).empty());
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(8, 3)).empty());
-  hits = windows_.SequenceIsSubscribed(FullSequence(100, 7));
-  EXPECT_EQ(hits.size(), 1);
-  EXPECT_EQ(hits[0]->subscribe_id(), 2);
-  hits = windows_.SequenceIsSubscribed(FullSequence(3, 0));
-  EXPECT_EQ(hits.size(), 2);
-  EXPECT_EQ(hits[0]->subscribe_id() + hits[1]->subscribe_id(), 1);
-}
-
-TEST_F(MoqtSubscribeWindowsTest, AddGetRemoveWindow) {
-  windows_.AddWindow(0, FullSequence(2, 5), 1, 0, 3, 9);
-  SubscribeWindow* window = windows_.GetWindow(0);
-  EXPECT_EQ(window->subscribe_id(), 0);
-  EXPECT_EQ(windows_.GetWindow(1), nullptr);
-  windows_.RemoveWindow(0);
-  EXPECT_EQ(windows_.GetWindow(0), nullptr);
-}
-
 }  // namespace test
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index d2644c8..ee85664 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -10,66 +10,6 @@
 
 namespace moqt {
 
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object);
-}
-
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object, uint64_t end_group) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  // The end object might be unknown.
-  auto it = max_object_ids_.find(end_group);
-  if (end_group >= next_sequence_.group) {
-    // Group is not fully published yet, so end object is unknown.
-    windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                       end_group, UINT64_MAX);
-    return;
-  }
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                     end_group, it->second);
-}
-
-void LocalTrack::AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                           uint64_t start_object, uint64_t end_group,
-                           uint64_t end_object) {
-  QUIC_BUG_IF(quic_bug_subscribe_to_canceled_track, announce_canceled_)
-      << "Canceled track got subscription";
-  windows_.AddWindow(subscribe_id, next_sequence_, start_group, start_object,
-                     end_group, end_object);
-}
-
-void LocalTrack::SentSequence(FullSequence sequence, MoqtObjectStatus status) {
-  QUICHE_DCHECK(max_object_ids_.find(sequence.group) == max_object_ids_.end() ||
-                max_object_ids_[sequence.group] < sequence.object);
-  switch (status) {
-    case MoqtObjectStatus::kNormal:
-    case MoqtObjectStatus::kObjectDoesNotExist:
-      if (next_sequence_ <= sequence) {
-        next_sequence_ = sequence.next();
-      }
-      break;
-    case MoqtObjectStatus::kGroupDoesNotExist:
-      max_object_ids_[sequence.group] = 0;
-      break;
-    case MoqtObjectStatus::kEndOfGroup:
-      max_object_ids_[sequence.group] = sequence.object;
-      if (next_sequence_ <= sequence) {
-        next_sequence_ = FullSequence(sequence.group + 1, 0);
-      }
-      break;
-    case MoqtObjectStatus::kEndOfTrack:
-      max_object_ids_[sequence.group] = sequence.object;
-      break;
-    default:
-      QUICHE_DCHECK(false);
-      return;
-  }
-}
-
 bool RemoteTrack::CheckForwardingPreference(
     MoqtForwardingPreference preference) {
   if (forwarding_preference_.has_value()) {
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index c9ece3f..3fefee5 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -7,127 +7,12 @@
 
 #include <cstdint>
 #include <optional>
-#include <vector>
 
-#include "absl/container/flat_hash_map.h"
-#include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/platform/api/quic_bug_tracker.h"
-#include "quiche/common/quiche_callbacks.h"
 
 namespace moqt {
 
-// A track to which the peer might subscribe.
-class LocalTrack {
- public:
-  class Visitor {
-   public:
-    virtual ~Visitor() = default;
-
-    using PublishPastObjectsCallback = quiche::SingleUseCallback<void()>;
-
-    // Requests that application re-publish objects from {start_group,
-    // start_object} to the latest object. If the return value is nullopt, the
-    // subscribe is valid and the application will deliver the object and
-    // the session will send SUBSCRIBE_OK. If the return has a value, the value
-    // is the error message (the session will send SUBSCRIBE_ERROR). Via this
-    // API, the application decides if a partially fulfillable
-    // SUBSCRIBE results in an error or not.
-    virtual absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
-        const SubscribeWindow& window) = 0;
-  };
-  // |visitor| must not be nullptr.
-  LocalTrack(const FullTrackName& full_track_name,
-             MoqtForwardingPreference forwarding_preference, Visitor* visitor)
-      : full_track_name_(full_track_name),
-        forwarding_preference_(forwarding_preference),
-        windows_(forwarding_preference),
-        visitor_(visitor) {}
-  // Creates a LocalTrack that does not start at sequence (0,0)
-  LocalTrack(const FullTrackName& full_track_name,
-             MoqtForwardingPreference forwarding_preference, Visitor* visitor,
-             FullSequence next_sequence)
-      : full_track_name_(full_track_name),
-        forwarding_preference_(forwarding_preference),
-        windows_(forwarding_preference),
-        next_sequence_(next_sequence),
-        visitor_(visitor) {}
-
-  const FullTrackName& full_track_name() const { return full_track_name_; }
-
-  std::optional<uint64_t> track_alias() const { return track_alias_; }
-  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
-
-  Visitor* visitor() { return visitor_; }
-
-  // Returns the subscribe windows that want the object defined by (|group|,
-  // |object|).
-  std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) {
-    return windows_.SequenceIsSubscribed(sequence);
-  }
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object);
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object, uint64_t end_group);
-
-  void AddWindow(uint64_t subscribe_id, uint64_t start_group,
-                 uint64_t start_object, uint64_t end_group,
-                 uint64_t end_object);
-
-  void DeleteWindow(uint64_t subscribe_id) {
-    windows_.RemoveWindow(subscribe_id);
-  }
-
-  // Returns the largest observed sequence, but increments the object sequence
-  // by one.
-  const FullSequence& next_sequence() const { return next_sequence_; }
-
-  // Updates next_sequence_ if |sequence| is larger. Updates max_object_ids_
-  // if relevant.
-  void SentSequence(FullSequence sequence, MoqtObjectStatus status);
-
-  bool HasSubscriber() const { return !windows_.IsEmpty(); }
-
-  SubscribeWindow* GetWindow(uint64_t subscribe_id) {
-    return windows_.GetWindow(subscribe_id);
-  }
-
-  MoqtForwardingPreference forwarding_preference() const {
-    return forwarding_preference_;
-  }
-
-  void set_announce_cancel() { announce_canceled_ = true; }
-  bool canceled() const { return announce_canceled_; }
-
- private:
-  // This only needs to track subscriptions to current and future objects;
-  // requests for objects in the past are forwarded to the application.
-  const FullTrackName full_track_name_;
-  // The forwarding preference for the track.
-  MoqtForwardingPreference forwarding_preference_;
-  // Let the first SUBSCRIBE determine the track alias.
-  std::optional<uint64_t> track_alias_;
-  // The sequence numbers from this track to which the peer is subscribed.
-  MoqtSubscribeWindows windows_;
-  // By recording the highest observed sequence number, MoQT can interpret
-  // relative sequence numbers in SUBSCRIBEs.
-  FullSequence next_sequence_ = {0, 0};
-  // The object ID of each EndOfGroup object received, indexed by group ID.
-  // Entry does not exist, if no kGroupDoesNotExist, EndOfGroup, or
-  // EndOfTrack has been received for that group.
-  absl::flat_hash_map<uint64_t, uint64_t> max_object_ids_;
-  Visitor* visitor_;
-
-  // If true, the session has received ANNOUNCE_CANCELED for this namespace.
-  // Additional subscribes will be a protocol error, and the track can be
-  // destroyed once all active subscribes end.
-  bool announce_canceled_ = false;
-};
-
 // A track on the peer to which the session has subscribed.
 class RemoteTrack {
  public:
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index fc0a867..4b0ff92 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -4,11 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
-#include <optional>
-
-#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
 #include "quiche/quic/platform/api/quic_test.h"
 
@@ -16,84 +11,6 @@
 
 namespace test {
 
-class LocalTrackTest : public quic::test::QuicTest {
- public:
-  LocalTrackTest()
-      : track_(FullTrackName("foo", "bar"), MoqtForwardingPreference::kTrack,
-               &visitor_, FullSequence(4, 1)) {}
-  LocalTrack track_;
-  MockLocalTrackVisitor visitor_;
-};
-
-TEST_F(LocalTrackTest, Queries) {
-  EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
-  EXPECT_EQ(track_.visitor(), &visitor_);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));
-  track_.SentSequence(FullSequence(4, 0), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));  // no change
-  track_.SentSequence(FullSequence(4, 1), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 2));
-  track_.SentSequence(FullSequence(4, 2), MoqtObjectStatus::kEndOfGroup);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(5, 0));
-  EXPECT_FALSE(track_.HasSubscriber());
-  EXPECT_EQ(track_.forwarding_preference(), MoqtForwardingPreference::kTrack);
-}
-
-TEST_F(LocalTrackTest, SetTrackAlias) {
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
-  track_.set_track_alias(6);
-  EXPECT_EQ(track_.track_alias(), 6);
-}
-
-TEST_F(LocalTrackTest, AddGetDeleteWindow) {
-  track_.AddWindow(0, 4, 1);
-  EXPECT_EQ(track_.GetWindow(0)->subscribe_id(), 0);
-  EXPECT_EQ(track_.GetWindow(1), nullptr);
-  track_.DeleteWindow(0);
-  EXPECT_EQ(track_.GetWindow(0), nullptr);
-}
-
-TEST_F(LocalTrackTest, GroupSubscriptionUsesMaxObjectId) {
-  // Populate max_object_ids_
-  track_.SentSequence(FullSequence(0, 0), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(1, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(1, 1), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(2, 0), MoqtObjectStatus::kGroupDoesNotExist);
-  track_.SentSequence(FullSequence(3, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 1), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 2), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(3, 3), MoqtObjectStatus::kEndOfGroup);
-  track_.SentSequence(FullSequence(4, 0), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 1), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 2), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 3), MoqtObjectStatus::kNormal);
-  track_.SentSequence(FullSequence(4, 4), MoqtObjectStatus::kNormal);
-  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 5));
-  track_.AddWindow(0, 1, 1, 3);
-  SubscribeWindow* window = track_.GetWindow(0);
-  EXPECT_TRUE(window->InWindow(FullSequence(3, 3)));
-  EXPECT_FALSE(window->InWindow(FullSequence(3, 4)));
-  // End on an empty group.
-  track_.AddWindow(1, 1, 1, 2);
-  window = track_.GetWindow(1);
-  EXPECT_TRUE(window->InWindow(FullSequence(1, 1)));
-  // End on an group in progress.
-  track_.AddWindow(2, 1, 1, 4);
-  window = track_.GetWindow(2);
-  EXPECT_TRUE(window->InWindow(FullSequence(4, 9)));
-  EXPECT_FALSE(window->InWindow(FullSequence(5, 0)));
-}
-
-TEST_F(LocalTrackTest, ShouldSend) {
-  track_.AddWindow(0, 4, 1);
-  EXPECT_TRUE(track_.HasSubscriber());
-  EXPECT_TRUE(track_.ShouldSend(FullSequence(3, 12)).empty());
-  EXPECT_TRUE(track_.ShouldSend(FullSequence(4, 0)).empty());
-  EXPECT_EQ(track_.ShouldSend(FullSequence(4, 1)).size(), 1);
-  EXPECT_EQ(track_.ShouldSend(FullSequence(12, 0)).size(), 1);
-}
-
 class RemoteTrackTest : public quic::test::QuicTest {
  public:
   RemoteTrackTest()
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index acdbf0c..a344a7d 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -26,6 +26,7 @@
 #include "quiche/quic/core/quic_server_id.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/tools/moqt_client.h"
@@ -37,6 +38,9 @@
 #include "quiche/quic/tools/quic_url.h"
 #include "quiche/common/platform/api/quiche_command_line_flags.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_mem_slice.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/simple_buffer_allocator.h"
 
 DEFINE_QUICHE_COMMAND_LINE_FLAG(
     bool, disable_certificate_verification, false,
@@ -105,20 +109,15 @@
       session_is_open_ = false;
       return;
     }
-    session_->PublishObject(my_track_name_, next_sequence_.group,
-                            next_sequence_.object++, /*object_send_order=*/0,
-                            moqt::MoqtObjectStatus::kNormal, input_message);
-    session_->PublishObject(my_track_name_, next_sequence_.group,
-                            next_sequence_.object, /*object_send_order=*/0,
-                            moqt::MoqtObjectStatus::kEndOfGroup, "");
-    ++next_sequence_.group;
-    next_sequence_.object = 0;
+    quiche::QuicheMemSlice message_slice(quiche::QuicheBuffer::Copy(
+        quiche::SimpleBufferAllocator::Get(), input_message));
+    queue_->AddObject(std::move(message_slice), /*key=*/true);
   }
 
   bool session_is_open() const { return session_is_open_; }
   bool is_syncing() const {
     return !catalog_group_.has_value() || subscribes_to_make_ > 0 ||
-           !session_->HasSubscribers(my_track_name_);
+           (queue_ != nullptr && queue_->HasSubscribers());
   }
 
   void RunEventLoop() {
@@ -202,10 +201,9 @@
       std::cout << "Failed to connect.\n";
       return false;
     }
-    // By not sending a visitor, the application will not fulfill subscriptions
-    // to previous objects.
-    session_->AddLocalTrack(my_track_name_,
-                            moqt::MoqtForwardingPreference::kObject, nullptr);
+    queue_ = std::make_shared<moqt::MoqtOutgoingQueue>(
+        client_->session(), my_track_name_,
+        moqt::MoqtForwardingPreference::kObject);
     moqt::MoqtOutgoingAnnounceCallback announce_callback =
         [&](absl::string_view track_namespace,
             std::optional<moqt::MoqtAnnounceErrorReason> reason) {
@@ -354,7 +352,7 @@
   std::unique_ptr<RemoteTrackVisitor> remote_track_visitor_;
 
   // Handling incoming and outgoing messages
-  moqt::FullSequence next_sequence_ = {0, 0};
+  std::shared_ptr<moqt::MoqtOutgoingQueue> queue_;
 
   // Used when chat output goes to file.
   std::ofstream output_file_;
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 0ea75fa..3dc5741 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -7,14 +7,16 @@
 
 #include <cstdint>
 #include <optional>
+#include <utility>
+#include <vector>
 
 #include "absl/status/statusor.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/common/platform/api/quiche_test.h"
 
 namespace moqt::test {
 
@@ -39,10 +41,28 @@
   }
 };
 
-class MockLocalTrackVisitor : public LocalTrack::Visitor {
+class MockTrackPublisher : public MoqtTrackPublisher {
  public:
-  MOCK_METHOD(absl::StatusOr<PublishPastObjectsCallback>, OnSubscribeForPast,
-              (const SubscribeWindow& window), (override));
+  explicit MockTrackPublisher(FullTrackName name)
+      : track_name_(std::move(name)) {}
+  const FullTrackName& GetTrackName() const override { return track_name_; }
+
+  MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
+              (FullSequence sequence), (const, override));
+  MOCK_METHOD(std::vector<FullSequence>, GetCachedObjectsInRange,
+              (FullSequence start, FullSequence end), (const, override));
+  MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
+              (override));
+  MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
+              (override));
+  MOCK_METHOD(absl::StatusOr<MoqtTrackStatusCode>, GetTrackStatus, (),
+              (const, override));
+  MOCK_METHOD(FullSequence, GetLargestSequence, (), (const, override));
+  MOCK_METHOD(MoqtForwardingPreference, GetForwardingPreference, (),
+              (const, override));
+
+ private:
+  FullTrackName track_name_;
 };
 
 class MockRemoteTrackVisitor : public RemoteTrack::Visitor {
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index d379afc..6aa5d01 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -9,6 +9,7 @@
 #include <cstdint>
 #include <cstring>
 #include <iostream>
+#include <memory>
 #include <optional>
 #include <string>
 #include <utility>
@@ -22,6 +23,7 @@
 #include "quiche/quic/core/quic_clock.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
+#include "quiche/quic/moqt/moqt_known_track_publisher.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_outgoing_queue.h"
 #include "quiche/quic/moqt/moqt_session.h"
@@ -95,7 +97,8 @@
                   int keyframe_interval, int fps, float i_to_p_ratio,
                   QuicBandwidth bitrate)
       : Actor(simulator, actor_name),
-        queue_(session, track_name),
+        queue_(std::make_shared<MoqtOutgoingQueue>(
+            session, track_name, MoqtForwardingPreference::kGroup)),
         keyframe_interval_(keyframe_interval),
         time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)) {
     int p_frame_count = keyframe_interval - 1;
@@ -123,18 +126,18 @@
     bool success = writer.WriteUInt64(clock_->Now().ToDebuggingValue());
     QUICHE_CHECK(success);
 
-    queue_.AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
+    queue_->AddObject(QuicheMemSlice(std::move(buffer)), i_frame);
     Schedule(clock_->Now() + time_between_frames_);
   }
 
   void Start() { Schedule(clock_->Now()); }
   void Stop() { Unschedule(); }
 
-  MoqtOutgoingQueue& queue() { return queue_; }
+  std::shared_ptr<MoqtOutgoingQueue> queue() { return queue_; }
   size_t total_objects_sent() const { return frame_number_ + 1; }
 
  private:
-  MoqtOutgoingQueue queue_;
+  std::shared_ptr<MoqtOutgoingQueue> queue_;
   int keyframe_interval_;
   QuicTimeDelta time_between_frames_;
   QuicByteCount i_frame_size_;
@@ -270,8 +273,8 @@
     //   (2) The client starts immediately generating data.  At this point, the
     //       server does not yet have an active subscription, so the client has
     //       some catching up to do.
-    client_session()->AddLocalTrack(
-        TrackName(), MoqtForwardingPreference::kGroup, &generator_.queue());
+    client_session()->set_publisher(&publisher_);
+    publisher_.Add(generator_.queue());
     generator_.Start();
     server_session()->SubscribeCurrentGroup(TrackName().track_namespace,
                                             TrackName().track_name, &receiver_);
@@ -298,6 +301,7 @@
   quic::simulator::Switch switch_;
   quic::simulator::SymmetricLink client_link_;
   quic::simulator::SymmetricLink server_link_;
+  MoqtKnownTrackPublisher publisher_;
   ObjectGenerator generator_;
   ObjectReceiver receiver_;
   SimulationParameters parameters_;
