Remove unnecessary tests from MoqtSessionTest. Putting the logic in OutgoingSubgroupStreamTest and SubscriptionPublisherTest is more compact and cleaner conceptually.

Includes a small fix to allow opening several streams at once.

Here is a mapping of the deleted tests and what covers them:
OSST = OutgoingSubgroupStreamTest
SPT = SubscriptionPublisherTest

CreateOutgoingSubgroupStreamAndSend -> CreateStream via
                                       SPT::UpdatePriorityWithActiveStreams
FinDataStreamFromCache -> OSST::OnCanWriteSetsAlarm
SendFragmentedObject -> OSST::SendFragmentedObject
GroupAbandonedNoDeliveryTimeout -> SPT::OnGroupAbandonedTooFarBehind
GroupAbandondedDeliveryTimeout is the same as GroupAbandonedNoDeliveryTimeout. This is an identical test!
GroupAbandoned -> SPT::OnGroupAbandonedWithStreams
LateFinInDataStream -> OSST::Fin
SeparateFinForFutureObject -> OSST::FinForFutureObject
PublisherAbandonsSubgrpup -> SPT::OnSubgroupAbandonedWithStream
UnidirectionalStreamCannotBeOpened ->
                         SPT::OnNewObjectAvailableStreamCreationBlocked,
                         SPT::OnCanCreateNewUniStreamSuccess
QueuedStreamIsCleared -> SPT::OnCanCreateNewUniStreamSuccess,
                         SPT::OnCanCreateNewUniStreamPendingCleanup
OutgoingStreamDisappears -> SPT::OnDataStreamDestroyed
SendDatagram -> SPT::OnNewObjectAvailableDatagram
QueuedStreamsOpenedInOrder -> SPT::PendingStreamsInOrder
StreamQueuedForSubscriptionThatDoesntExist ->
                        SPT::OnCanCreateNewUniStreamPendingCleanup
QueuedStreamPriorityChanged -> MoqtSessionTest::UpdateTrackPriority,
                               SPT::PendingStreamsInOrder,
                               SPT::UpdatePriorityWithPendingStreams
DeliveryTimeoutExpiredOnArrival -> OSST::OnCanWriteTimeout
DeliveryTimeoutAfterIntegratedFin -> OSST::OnCanWriteSetsAlarm
DeliveryTimeoutAfterSeparateFin -> OSST::Fin
DeliveryTimeoutAlternateDesign -> AlternateDeliveryTimeout
IncomingRequestUpdateTruncatesSubscription ->
  MoqtSessionTest::IncomingRequestUpdateTriggersRequestOk,
  MoqtSessionTest::IncomingRequestUpdateTriggersRequestError,
  SPT::IncomingUpdateTruncatesSubscription

PiperOrigin-RevId: 918686567
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index ffa3e3d..d715290 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -763,6 +763,10 @@
               std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) {
             received_ok = std::holds_alternative<SubscribeOkData>(response);
           });
+  bool stream_reset = false;
+  EXPECT_CALL(subscribe_visitor_, OnStreamReset).WillOnce([&]() {
+    stream_reset = true;
+  });
   MessageParameters parameters(MoqtFilterType::kLargestObject);
   // Set delivery timeout to ~ 1 RTT: any loss is fatal.
   parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
@@ -780,15 +784,13 @@
           [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
               absl::string_view object,
               uint64_t offset) { bytes_received += object.size(); });
-  queue->AddObject(Location{0, 0}, 0, data, false);
-  queue->AddObject(Location{0, 1}, 0, data, false);
-  queue->AddObject(Location{0, 2}, 0, data, false);
-  queue->AddObject(Location{0, 3}, 0, data, true);
-  success = test_harness_.RunUntilWithDefaultTimeout([&]() {
-    return MoqtSessionPeer::SubgroupHasBeenReset(
-        MoqtSessionPeer::GetSubscription(server_->session(), 0),
-        DataStreamIndex{0, 0});
-  });
+  quic::QuicTime now = test_harness_.simulator().GetClock()->Now();
+  queue->AddObject(Location{0, 0}, 0, data, false, now);
+  queue->AddObject(Location{0, 1}, 0, data, false, now);
+  queue->AddObject(Location{0, 2}, 0, data, false, now);
+  queue->AddObject(Location{0, 3}, 0, data, true, now);
+  success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return stream_reset; });
   EXPECT_TRUE(success);
   // Stream was reset before all the bytes arrived.
   EXPECT_LT(bytes_received, 4000);
@@ -812,6 +814,10 @@
               std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) {
             received_ok = std::holds_alternative<SubscribeOkData>(response);
           });
+  bool stream_reset = false;
+  EXPECT_CALL(subscribe_visitor_, OnStreamReset).WillOnce([&]() {
+    stream_reset = true;
+  });
   MessageParameters parameters(MoqtFilterType::kLargestObject);
   // Set delivery timeout to ~ 1 RTT: any loss is fatal.
   parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
@@ -831,13 +837,11 @@
           [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
               absl::string_view object,
               uint64_t offset) { bytes_received += object.size(); });
-  queue->AddObject(Location{0, 0}, 0, data, false);
-  queue->AddObject(Location{1, 0}, 0, data, false);
-  success = test_harness_.RunUntilWithDefaultTimeout([&]() {
-    return MoqtSessionPeer::SubgroupHasBeenReset(
-        MoqtSessionPeer::GetSubscription(server_->session(), 0),
-        DataStreamIndex{0, 0});
-  });
+  quic::QuicTime now = test_harness_.simulator().GetClock()->Now();
+  queue->AddObject(Location{0, 0}, 0, data, false, now);
+  queue->AddObject(Location{1, 0}, 0, data, false, now);
+  success =
+      test_harness_.RunUntilWithDefaultTimeout([&]() { return stream_reset; });
   EXPECT_TRUE(success);
   EXPECT_EQ(bytes_received, 2000);
 }
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 66c620e..7802cff 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -18,7 +18,6 @@
 #include "absl/base/casts.h"
 #include "absl/memory/memory.h"
 #include "absl/status/status.h"
-#include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/quic_data_reader.h"
@@ -124,20 +123,6 @@
   return fetch;
 }
 
-// TODO(martinduke): Eliminate MoqtSessionPeer::AddSubscription, which allows
-// this to be removed as well.
-static std::shared_ptr<MockTrackPublisher> SetupPublisher(
-    FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
-    std::optional<Location> largest_sequence) {
-  auto publisher = std::make_shared<MockTrackPublisher>(std::move(track_name));
-  ON_CALL(*publisher, largest_location())
-      .WillByDefault(Return(largest_sequence));
-  ON_CALL(*publisher, extensions())
-      .WillByDefault(testing::ReturnRef(kNoExtensions));
-  ON_CALL(*publisher, expiration()).WillByDefault(Return(std::nullopt));
-  return publisher;
-}
-
 std::optional<MoqtMessageType> PeekControlMessageType(absl::string_view data) {
   quiche::QuicheDataReader reader(data);
   uint64_t varint;
@@ -549,8 +534,8 @@
   EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
   listener->OnSubscribeAccepted();
-  EXPECT_NE(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
-            nullptr);
+  EXPECT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+      &session_, kDefaultPeerRequestId));
 }
 
 TEST_F(MoqtSessionTest, AsynchronousSubscribeReturnsError) {
@@ -567,8 +552,8 @@
               Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
   listener->OnSubscribeRejected(MoqtRequestErrorInfo(
       RequestErrorCode::kInternalError, std::nullopt, "Test error"));
-  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
-            nullptr);
+  EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+      &session_, kDefaultPeerRequestId));
 }
 
 TEST_F(MoqtSessionTest, SynchronousSubscribeReturnsError) {
@@ -586,8 +571,8 @@
             RequestErrorCode::kInternalError, std::nullopt, "Test error"));
       });
   stream_input->ReceiveMessage(request);
-  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
-            nullptr);
+  EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+      &session_, kDefaultPeerRequestId));
 }
 
 TEST_F(MoqtSessionTest, SubscribeForPast) {
@@ -676,7 +661,7 @@
       kDefaultPeerRequestId,
   };
   stream_input->ReceiveMessage(unsubscribe);
-  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+  EXPECT_FALSE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(&session_, 1));
 
   // Subscribe again, succeeds.
   request.request_id = 3;
@@ -1302,816 +1287,21 @@
   control_stream->ReceiveMessage(subscribe_ok);
 }
 
-// TODO(martinduke): Most of these test cases no longer need to be in
-// MoqtSessionTest. Find any useful functionality and put it in
-// SubscriptionPublisherTest or OutgoingSubgroupStreamTest.
-
-TEST_F(MoqtSessionTest, CreateOutgoingSubgroupStreamAndSend) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first six message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f, 0x00, 0x0a};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin |= options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "extensions",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), false};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0, 127);
-  EXPECT_TRUE(correct_message);
-  EXPECT_FALSE(fin);
-  std::optional<Location> largest_sent =
-      MoqtSessionPeer::LargestSentForSubscription(&session_, 0);
-  EXPECT_TRUE(largest_sent.has_value() && *largest_sent == Location(5, 0));
-}
-
-TEST_F(MoqtSessionTest, FinDataStreamFromCache) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first four message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), true};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(fin);
-}
-
-TEST_F(MoqtSessionTest, SendFragmentedObject) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  // 1st OnNewObjectAvailable happens when no outgoing unidirectional stream is
-  // available.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false));
-  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-
-  // 2nd OnNewObjectAvailable happens when Part 1 ("part1", 5 bytes) and
-  // Part 2 ("part2", 5 bytes) are available.
-  bool fin = false;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-  PublishedObjectMetadata metadata = {
-      Location(5, 0),
-      /*subgroup=*/0,
-      /*extensions=*/"",     MoqtObjectStatus::kNormal,
-      /*priority=*/128,
-      /*payload_length=*/15, MoqtSessionPeer::Now(&session_)};
-  std::vector<quiche::QuicheMemSlice> payload;
-  payload.push_back(quiche::QuicheMemSlice::Copy("part1"));
-  payload.push_back(quiche::QuicheMemSlice::Copy("part2"));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{metadata, std::move(payload), false}));
-  // Expect Writev for the header and first two parts.
-  EXPECT_CALL(mock_stream_, Writev)
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        EXPECT_EQ(data.size(), 3);  // Header, part1, and part2.
-        EXPECT_EQ(data[1].AsStringView(), "part1");
-        EXPECT_EQ(data[2].AsStringView(), "part2");
-        EXPECT_FALSE(options.send_fin());
-        return absl::OkStatus();
-      });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-
-  // 3rd OnNewObjectAvailable happens when Part 3 ("part3", 5 bytes) is
-  // available.
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 10))
-      .WillOnce(
-          Return(PublishedObject{metadata, PayloadFromString("part3"), true}));
-  EXPECT_CALL(mock_stream_, Writev)
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        EXPECT_TRUE(options.send_fin());
-        EXPECT_EQ(data.size(), 1);  // No header, just part3.
-        EXPECT_EQ(data[0].AsStringView(), "part3");
-        fin = true;
-        return absl::OkStatus();
-      });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0, 128);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandonedNoDeliveryTimeout) {
-  FullTrackName ftn("foo", "bar");
-  webtransport::test::MockStream control_stream;
-  std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first four message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin |= options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), true};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(fin);
-
-  struct MoqtPublishDone expected_publish_done = {
-      /*request_id=*/0,
-      PublishDoneCode::kTooFarBehind,
-      /*stream_count=*/1,
-      /*error_reason=*/"",
-  };
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
-  EXPECT_CALL(control_stream,
-              Writev(SerializedControlMessage(expected_publish_done), _));
-  subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandonedDeliveryTimeout) {
-  FullTrackName ftn("foo", "bar");
-  webtransport::test::MockStream control_stream;
-  std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first four message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin |= options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), true};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(fin);
-
-  struct MoqtPublishDone expected_publish_done = {
-      /*request_id=*/0,
-      PublishDoneCode::kTooFarBehind,
-      /*stream_count=*/1,
-      /*error_reason=*/"",
-  };
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
-  EXPECT_CALL(control_stream,
-              Writev(SerializedControlMessage(expected_publish_done), _));
-  subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, GroupAbandoned) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-  MoqtSessionPeer::SetDeliveryTimeout(subscription,
-                                      quic::QuicTimeDelta::FromSeconds(1000));
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first four message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin |= options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), true};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(fin);
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
-  subscription->OnGroupAbandoned(5);
-}
-
-TEST_F(MoqtSessionTest, LateFinDataStream) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first four message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x11, 0x02, 0x05, 0x7f};
-  EXPECT_CALL(mock_stream_, Writev)
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), false};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_TRUE(correct_message);
-  EXPECT_FALSE(fin);
-  fin = false;
-  EXPECT_CALL(mock_stream_, Writev)
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        EXPECT_TRUE(data.empty());
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  subscription->OnNewFinAvailable(Location(5, 0), 0);
-}
-
-TEST_F(MoqtSessionTest, SeparateFinForFutureObject) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  // Verify first six message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), false};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  EXPECT_FALSE(fin);
-  // Try to deliver (5,1), but fail.
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
-  EXPECT_CALL(*track, GetCachedObject).Times(0);
-  EXPECT_CALL(mock_stream_, Writev).Times(0);
-  subscription->OnNewObjectAvailable(Location(5, 1), 0,
-                                     kDefaultPublisherPriority);
-  // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
-  EXPECT_CALL(mock_stream_, Writev).Times(0);
-  subscription->OnNewFinAvailable(Location(5, 1), 0);
-
-  // Reopen the window.
-  correct_message = false;
-  // object id, extensions, payload length, status.
-  const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03};
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 1), 0, "",
-                                    MoqtObjectStatus::kEndOfGroup, 127, 0,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString(""), true};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage2);
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  stream_visitor->OnCanWrite();
-  EXPECT_TRUE(correct_message);
-  EXPECT_TRUE(fin);
-}
-
-TEST_F(MoqtSessionTest, PublisherAbandonsSubgroup) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  // Deliver first object.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-  // Verify first six message fields are sent correctly
-  bool correct_message = false;
-  const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
-  EXPECT_CALL(mock_stream_, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        correct_message =
-            absl::StartsWith(data[0].AsStringView(), kExpectedMessage);
-        fin = options.send_fin();
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(5, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 127, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), false};
-      });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  // Abandon the subgroup.
-  EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
-  subscription->OnSubgroupAbandoned(5, 0, 0x1);
-}
-
-// TODO: Test operation with multiple streams.
-
-TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  // Queue the outgoing stream.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false));
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  // Unblock the session, and cause the queued stream to be sent.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  webtransport::StreamPriority expected_priority{
-      kMoqtSendGroupId,
-      SendOrderForStream(kDefaultSubscriberPriority, kDefaultPublisherPriority,
-                         5, 0, MoqtDeliveryOrder::kAscending)};
-  EXPECT_CALL(mock_stream_, SetPriority(expected_priority));
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
-    return PublishedObject{PublishedObjectMetadata{
-                               Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               128, 8, quic::QuicTime::Zero()},
-                           PayloadFromString("deadbeef")};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, QueuedStreamIsCleared) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  // Queue the outgoing stream.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(false));
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  subscription->OnNewObjectAvailable(Location(6, 0), 0,
-                                     kDefaultPublisherPriority);
-  subscription->OnGroupAbandoned(5);
-
-  // Unblock the session, and cause the queued stream to be sent. There should
-  // be only one stream.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true))
-      .WillOnce(Return(true));
-  bool fin = false;
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(6, 0), 0, "",
-                                    MoqtObjectStatus::kNormal, 128, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef")};
-      });
-  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1, 0)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
-
-  // Set up an outgoing stream for a group.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream_));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream_, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(mock_stream_, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream_));
-
-  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0, 0)).WillRepeatedly([] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(5, 0), 0, "",
-                                MoqtObjectStatus::kNormal, 128, 8},
-        PayloadFromString("deadbeef")};
-  });
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).WillOnce([] {
-    return std::optional<PublishedObject>();
-  });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority);
-  // Now that the stream exists and is recorded within subscription, make it
-  // disappear by returning nullptr.
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(nullptr));
-  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1, 0)).Times(0);
-  subscription->OnNewObjectAvailable(Location(5, 1), 0,
-                                     kDefaultPublisherPriority);
-}
-
 TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
-  MoqtUnsubscribe unsubscribe = {
-      /*request_id=*/0,
-  };
-  stream_input->ReceiveMessage(unsubscribe);
-  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 0), nullptr);
-}
-
-TEST_F(MoqtSessionTest, SendDatagram) {
-  FullTrackName ftn("foo", "bar");
-  std::shared_ptr<MockTrackPublisher> track_publisher =
-      SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location{4, 0});
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtSubscribe request = DefaultSubscribe();
+  const MoqtPriority kLocalDefaultPriority = 0x20;
   std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* listener =
-      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 0, 2, 5, 0);
-
-  // Publish in window.
-  bool correct_message = false;
-  uint8_t kExpectedMessage[] = {
-      0x05, 0x02, 0x05, 0x20, 0x03, 0x65, 0x78, 0x74,
-      0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,  // "deadbeef"
-  };
-  EXPECT_CALL(mock_session_, SendOrQueueDatagram)
-      .WillOnce([&](absl::string_view datagram) {
-        if (datagram.size() == sizeof(kExpectedMessage)) {
-          correct_message = (0 == memcmp(datagram.data(), kExpectedMessage,
-                                         sizeof(kExpectedMessage)));
-        }
-        return webtransport::DatagramStatus(
-            webtransport::DatagramStatusCode::kSuccess, "");
-      });
-  EXPECT_CALL(*track_publisher,
-              GetCachedObject(5, std::optional<uint64_t>(), 0, 0))
-      .WillRepeatedly([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext",
-                                    MoqtObjectStatus::kNormal, 32, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef")};
-      });
-  listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32);
-  EXPECT_TRUE(correct_message);
+  TrackExtensions extensions(std::nullopt, std::nullopt, kLocalDefaultPriority,
+                             std::nullopt, std::nullopt, std::nullopt);
+  EXPECT_CALL(*track, extensions)
+      .WillRepeatedly(testing::ReturnRef(extensions));
+  MoqtObjectListener* listener = ReceiveSubscribeSynchronousOk(
+      track, request, control_stream.get(), /*track_alias=*/0, extensions);
+  MoqtUnsubscribe unsubscribe = {/*request_id=*/1};
+  EXPECT_CALL(*track, RemoveObjectListener(listener));
+  control_stream->ReceiveMessage(unsubscribe);
 }
 
 TEST_F(MoqtSessionTest, ReceiveDatagram) {
@@ -2312,233 +1502,39 @@
   session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
 }
 
-TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false))
-      .WillOnce(Return(false))
-      .WillOnce(Return(false));
-  subscription->OnNewObjectAvailable(Location(1, 0), 0,
-                                     kDefaultPublisherPriority);
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-  subscription->OnNewObjectAvailable(Location(2, 0), 0,
-                                     kDefaultPublisherPriority);
-  // These should be opened in the sequence (0, 0), (1, 0), (2, 0).
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  webtransport::test::MockStream mock_stream0, mock_stream1, mock_stream2;
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream0))
-      .WillOnce(Return(&mock_stream1))
-      .WillOnce(Return(&mock_stream2));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor[3];
-  EXPECT_CALL(mock_stream0, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor[0] = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream1, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor[1] = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream2, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor[2] = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
-  EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
-  EXPECT_CALL(mock_stream2, GetStreamId()).WillRepeatedly(Return(2));
-  EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
-    return stream_visitor[0].get();
-  });
-  EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
-    return stream_visitor[1].get();
-  });
-  EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
-    return stream_visitor[2].get();
-  });
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 128, 6,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString("foobar")}));
-  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1, 0))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(1, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127, 8,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1, 0))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(2, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127, 8,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1, 0))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_stream0, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        // The Group ID is the 3rd byte of the stream.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(mock_stream1, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        // The Group ID is the 3rd byte of the stream.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 1);
-        return absl::OkStatus();
-      });
-  EXPECT_CALL(mock_stream2, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        // The Group ID is the 3rd byte of the stream.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 2);
-        return absl::OkStatus();
-      });
-  session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
-  FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false));
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  // Delete the subscription, then grant stream credit.
-  MoqtSessionPeer::DeleteSubscription(&session_, 0);
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()).Times(0);
-  session_.OnCanCreateNewOutgoingUnidirectionalStream();
-}
-
-TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
-  FullTrackName ftn1("foo", "bar");
-  auto track1 =
-      SetupPublisher(ftn1, MoqtForwardingPreference::kSubgroup, Location(0, 0));
-  FullTrackName ftn2("dead", "beef");
-  auto track2 =
-      SetupPublisher(ftn2, MoqtForwardingPreference::kSubgroup, Location(0, 0));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription0 =
-      MoqtSessionPeer::AddSubscription(&session_, track1, 0, 14, 0, 0);
-  MoqtObjectListener* subscription1 =
-      MoqtSessionPeer::AddSubscription(&session_, track2, 1, 15, 0, 0);
-  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 0, 1);
-  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 2);
-
-  // Two published objects will queue four streams.
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false))
-      .WillOnce(Return(false))
-      .WillOnce(Return(false))
-      .WillOnce(Return(false));
-  subscription0->OnNewObjectAvailable(Location(0, 0), 0,
-                                      kDefaultPublisherPriority);
-  subscription1->OnNewObjectAvailable(Location(0, 0), 0,
-                                      kDefaultPublisherPriority);
-  subscription0->OnNewObjectAvailable(Location(1, 0), 0,
-                                      kDefaultPublisherPriority);
-  subscription1->OnNewObjectAvailable(Location(1, 0), 0,
-                                      kDefaultPublisherPriority);
-
-  // Allow one stream to be opened. It will be group 0, subscription 0.
+TEST_F(MoqtSessionTest, UpdateTrackPriority) {
+  session_.UpdateTrackPriority(0, std::nullopt, MoqtTrackPriority{0x40, 0x82});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+  // Same track, higher priority.
+  session_.UpdateTrackPriority(0, MoqtTrackPriority{0x40, 0x82},
+                               MoqtTrackPriority{0x40, 0x80});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+  // New track, higher priority.
+  session_.UpdateTrackPriority(2, std::nullopt, MoqtTrackPriority{0x20, 0x82});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
+  // Pop request ID 2 from the queue.  The subscription doesn't really exist, so
+  // nothing else happens.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true))
-      .WillOnce(Return(true))
       .WillOnce(Return(false));
-  webtransport::test::MockStream mock_stream0;
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream0));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
-  EXPECT_CALL(mock_stream0, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor0 = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
-  EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
-    return stream_visitor0.get();
-  });
-  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127, 6},
-          PayloadFromString("foobar")}));
-  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1, 0))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_stream0, Writev)
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        // Check track alias is 14.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 14);
-        // Check Group ID is 0
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
-        return absl::OkStatus();
-      });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
-
-  // Raise the priority of subscription 1 and allow another stream. It will be
-  // group 0, subscription 1.
-  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 0);
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
+  // There's another stream for request ID 2.
+  session_.UpdateTrackPriority(2, std::nullopt, MoqtTrackPriority{0x20, 0x81});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
+  // The subscriber demotes track 2. Track 0 is first now due to higher
+  // publisher priority.
+  session_.UpdateTrackPriority(2, MoqtTrackPriority{0x20, 0x81},
+                               MoqtTrackPriority{0x40, 0x81});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 0);
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true))
-      .WillOnce(Return(true))
-      .WillRepeatedly(Return(false));
-  webtransport::test::MockStream mock_stream1;
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream1));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
-  EXPECT_CALL(mock_stream1, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor1 = std::move(visitor);
-      });
-  EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
-  EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
-    return stream_visitor1.get();
-  });
-  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0, 0))
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kNormal, 127, 8},
-          PayloadFromString("deadbeef")}));
-  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1, 0))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_stream1, Writev(_, _))
-      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
-                    const webtransport::StreamWriteOptions& options) {
-        // Check track alias is 15.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[1]), 15);
-        // Check Group ID is 0
-        EXPECT_EQ(static_cast<const uint8_t>(data[0].AsStringView()[2]), 0);
-        return absl::OkStatus();
-      });
+      .WillOnce(Return(false));
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
+  // The subscription will update with the first stream. It's lower priority
+  // than request ID 2.
+  session_.UpdateTrackPriority(0, std::nullopt, MoqtTrackPriority{0x40, 0x82});
+  EXPECT_EQ(MoqtSessionPeer::NextQueuedRequestIdToServer(&session_), 2);
 }
 
 // Helper functions to handle the many EXPECT_CALLs for FETCH processing and
@@ -2887,14 +1883,8 @@
   SetLargestId(track, Location(4, 10));
   ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
-  ASSERT_NE(subscription, nullptr);
-  EXPECT_TRUE(
-      MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11)));
-  EXPECT_FALSE(
-      MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10)));
-
+  ASSERT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+      &session_, subscribe.request_id));
   MoqtFetch fetch = DefaultFetch();
   fetch.request_id = 3;
   fetch.fetch = JoiningFetchRelative(1, 2);
@@ -2914,14 +1904,8 @@
   SetLargestId(track, Location(4, 10));
   ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
-  ASSERT_NE(subscription, nullptr);
-  EXPECT_TRUE(
-      MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11)));
-  EXPECT_FALSE(
-      MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10)));
-
+  ASSERT_TRUE(MoqtSessionPeer::RequestIdIsSubscriptionPublisher(
+      &session_, subscribe.request_id));
   MoqtFetch fetch = DefaultFetch();
   fetch.request_id = 3;
   fetch.fetch = JoiningFetchAbsolute(1, 2);
@@ -3438,265 +2422,10 @@
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MockTrackPublisher* track = CreateTrackPublisher();
   ReceiveSubscribeSynchronousOk(track, request, control_stream.get());
-
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::GetSubscription(&session_, 1);
-  ASSERT_NE(subscription, nullptr);
-  EXPECT_EQ(MoqtSessionPeer::GetDeliveryTimeout(subscription),
-            quic::QuicTimeDelta::FromSeconds(1));
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutExpiredOnArrival) {
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
-  ASSERT_NE(subscription, nullptr);
-  MoqtSessionPeer::SetDeliveryTimeout(subscription,
-                                      quic::QuicTimeDelta::FromSeconds(1));
-
-  webtransport::test::MockStream data_mock;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_mock));
-  EXPECT_CALL(data_mock, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_mock, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{
-                              Location(0, 0),
-                              0,
-                              "",
-                              MoqtObjectStatus::kObjectDoesNotExist,
-                              0,
-                              0,
-                              MoqtSessionPeer::Now(&session_) -
-                                  quic::QuicTimeDelta::FromSeconds(1),
-                          },
-                          std::vector<quiche::QuicheMemSlice>(), false}));
-  EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
-      .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
-        stream_visitor.reset();
-      });
-  // Arrival time is very old; reset immediately.
-  ON_CALL(*track_publisher, largest_location)
-      .WillByDefault(Return(Location(0, 0)));
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-  // Subsequent objects for that subgroup are ignored.
-  EXPECT_CALL(*track_publisher, GetCachedObject).Times(0);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0);
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .Times(0);
-  ON_CALL(*track_publisher, largest_location)
-      .WillByDefault(Return(Location(0, 1)));
-  subscription->OnNewObjectAvailable(Location(0, 1), 0,
-                                     kDefaultPublisherPriority);
-  // Check that reset_subgroups_ is pruned.
-  EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
-                                                    DataStreamIndex(0, 0)));
-  subscription->OnGroupAbandoned(0);
-  EXPECT_FALSE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
-                                                     DataStreamIndex(0, 0)));
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAfterIntegratedFin) {
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
-  ASSERT_NE(subscription, nullptr);
-  MoqtSessionPeer::SetDeliveryTimeout(subscription,
-                                      quic::QuicTimeDelta::FromSeconds(1));
-
-  webtransport::test::MockStream data_mock;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_mock));
-  EXPECT_CALL(data_mock, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_mock, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString(""), true}))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout)).Times(0);
-  ON_CALL(*track_publisher, largest_location)
-      .WillByDefault(Return(Location(0, 0)));
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-  auto* delivery_alarm =
-      absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
-          MoqtSessionPeer::GetAlarm(stream_visitor.get()));
-  EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
-      .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
-        stream_visitor.reset();
-      });
-  delivery_alarm->Fire();
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAfterSeparateFin) {
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
-  ASSERT_NE(subscription, nullptr);
-  MoqtSessionPeer::SetDeliveryTimeout(subscription,
-                                      quic::QuicTimeDelta::FromSeconds(1));
-
-  webtransport::test::MockStream data_mock;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_mock));
-  EXPECT_CALL(data_mock, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&data_mock));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(data_mock, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor = std::move(visitor);
-      });
-  EXPECT_CALL(data_mock, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_mock, visitor()).WillRepeatedly([&]() {
-    return stream_visitor.get();
-  });
-  EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString(""), false}))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  ON_CALL(*track_publisher, largest_location())
-      .WillByDefault(Return(Location(0, 0)));
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  subscription->OnNewFinAvailable(Location(0, 0), 0);
-  auto* delivery_alarm =
-      absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
-          MoqtSessionPeer::GetAlarm(stream_visitor.get()));
-  EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeDeliveryTimeout))
-      .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
-        stream_visitor.reset();
-      });
-  delivery_alarm->Fire();
-}
-
-TEST_F(MoqtSessionTest, DeliveryTimeoutAlternateDesign) {
-  session_.UseAlternateDeliveryTimeout();
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* subscription =
-      MoqtSessionPeer::AddSubscription(&session_, track_publisher, 1, 2, 0, 0);
-  ASSERT_NE(subscription, nullptr);
-  MoqtSessionPeer::SetDeliveryTimeout(subscription,
-                                      quic::QuicTimeDelta::FromSeconds(1));
-
-  webtransport::test::MockStream data_mock1;
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillRepeatedly(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_mock1));
-  EXPECT_CALL(data_mock1, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&data_mock1));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
-  EXPECT_CALL(data_mock1, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor1 = std::move(visitor);
-      });
-  EXPECT_CALL(data_mock1, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_mock1, visitor()).WillRepeatedly([&]() {
-    return stream_visitor1.get();
-  });
-  EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(0, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString(""), false}))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  ON_CALL(*track_publisher, largest_location)
-      .WillByDefault(Return(Location(0, 0)));
-  subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  webtransport::test::MockStream data_mock2;
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&data_mock2));
-  EXPECT_CALL(data_mock2, GetStreamId())
-      .WillRepeatedly(Return(kOutgoingUniStreamId + 4));
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId + 4))
-      .WillRepeatedly(Return(&data_mock2));
-  std::unique_ptr<webtransport::StreamVisitor> stream_visitor2;
-  EXPECT_CALL(data_mock2, SetVisitor(_))
-      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
-        stream_visitor2 = std::move(visitor);
-      });
-  EXPECT_CALL(data_mock2, CanWrite()).WillRepeatedly(Return(true));
-  EXPECT_CALL(data_mock2, visitor()).WillRepeatedly([&]() {
-    return stream_visitor2.get();
-  });
-  EXPECT_CALL(*track_publisher, GetCachedObject)
-      .WillOnce(Return(PublishedObject{
-          PublishedObjectMetadata{Location(1, 0), 0, "",
-                                  MoqtObjectStatus::kObjectDoesNotExist, 0, 0,
-                                  MoqtSessionPeer::Now(&session_)},
-          PayloadFromString(""), false}))
-      .WillOnce(Return(std::nullopt));
-  EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  ON_CALL(*track_publisher, largest_location)
-      .WillByDefault(Return(Location(1, 0)));
-  subscription->OnNewObjectAvailable(Location(1, 0), 0,
-                                     kDefaultPublisherPriority);
-
-  // Group 1 should start the timer on the Group 0 stream.
-  auto* delivery_alarm =
-      absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
-          MoqtSessionPeer::GetAlarm(stream_visitor1.get()));
-  EXPECT_CALL(data_mock1, ResetWithUserCode(kResetCodeDeliveryTimeout))
-      .WillOnce([&](webtransport::StreamErrorCode /*error*/) {
-        stream_visitor1.reset();
-      });
-  delivery_alarm->Fire();
+  std::optional<quic::QuicTimeDelta> delivery_timeout =
+      MoqtSessionPeer::GetDeliveryTimeout(&session_, request.request_id);
+  EXPECT_TRUE(delivery_timeout.has_value() &&
+              *delivery_timeout == quic::QuicTimeDelta::FromSeconds(1));
 }
 
 TEST_F(MoqtSessionTest, ReceiveGoAwayEnforcement) {
@@ -4402,45 +3131,23 @@
   control_stream->ReceiveMessage(MoqtNamespaceDone());
 }
 
-TEST_F(MoqtSessionTest, IncomingRequestUpdateTruncatesSubscription) {
-  FullTrackName ftn("foo", "bar");
-  std::shared_ptr<MoqtTrackPublisher> publisher =
-      SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location(8, 0));
-  MockTrackPublisher* mock_publisher =
-      absl::down_cast<MockTrackPublisher*>(publisher.get());
+TEST_F(MoqtSessionTest, IncomingRequestUpdateTriggersRequestOk) {
+  MoqtSubscribe subscribe = DefaultSubscribe();
+  MockTrackPublisher* track = CreateTrackPublisher();
   std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
-  MoqtObjectListener* listener =
-      MoqtSessionPeer::AddSubscription(&session_, publisher, /*request_id=*/1,
-                                       /*track_alias=*/2, /*start_group=*/4,
-                                       /*start_object=*/0);
-
-  // Send a datagram in window.
-  EXPECT_CALL(*mock_publisher,
-              GetCachedObject(8, std::optional<uint64_t>(), 0, 0))
-      .WillOnce([&] {
-        return PublishedObject{
-            PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions",
-                                    MoqtObjectStatus::kNormal, 128, 8,
-                                    MoqtSessionPeer::Now(&session_)},
-            PayloadFromString("deadbeef"), false};
-      });
-  EXPECT_CALL(mock_session_, SendOrQueueDatagram)
-      .WillOnce(Return(webtransport::DatagramStatus(
-          webtransport::DatagramStatusCode::kSuccess, "")));
-
-  listener->OnNewObjectAvailable(Location(8, 0), std::nullopt, 0x80);
-
-  // Update the filter to exclude the live edge. The next object is out of
-  // window.
-  MessageParameters parameters;
-  parameters.subscription_filter.emplace(Location(4, 0), 7);
+  ReceiveSubscribeSynchronousOk(track, subscribe, control_stream.get(), 0);
   EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kRequestOk), _));
-  control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, parameters});
-  EXPECT_CALL(*mock_publisher, GetCachedObject).Times(0);
-  EXPECT_CALL(mock_session_, SendOrQueueDatagram).Times(0);
-  listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80);
+  control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, MessageParameters()});
+}
+
+TEST_F(MoqtSessionTest, IncomingRequestUpdateTriggersRequestError) {
+  std::unique_ptr<MoqtBidiStreamTestWrapper> control_stream =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  EXPECT_CALL(mock_stream_,
+              Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _));
+  control_stream->ReceiveMessage(MoqtRequestUpdate{3, 1, MessageParameters()});
 }
 
 TEST_F(MoqtSessionTest, StopSendingBlocksSubgroup) {
diff --git a/quiche/quic/moqt/moqt_subscription.cc b/quiche/quic/moqt/moqt_subscription.cc
index 29b7e7f..c62d3e0 100644
--- a/quiche/quic/moqt/moqt_subscription.cc
+++ b/quiche/quic/moqt/moqt_subscription.cc
@@ -75,6 +75,8 @@
 void SubscriptionPublisher::Update(const MessageParameters& parameters) {
   // TODO(martinduke): If there are auth tokens, this probably has to go to the
   // application.
+  // TODO(martinduke): If the subscribe window has shrunk, close any streams
+  // that are now outside the window. Also send PUBLISH_DONE if now done.
   MoqtPriority old_priority =
       parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority);
   parameters_.Update(parameters);
@@ -402,27 +404,30 @@
 }
 
 void SubscriptionPublisher::OnCanCreateNewUniStream() {
-  auto it = pending_streams_.rbegin();
-  while (it != pending_streams_.rend() &&
-         (it->second.index.group < first_active_group_ ||
-          reset_subgroups_.contains(it->second.index))) {
+  while (visitor_->session() != nullptr &&
+         visitor_->session()->CanOpenNextOutgoingUnidirectionalStream()) {
+    auto it = pending_streams_.rbegin();
+    while (it != pending_streams_.rend() &&
+           (it->second.index.group < first_active_group_ ||
+            reset_subgroups_.contains(it->second.index))) {
+      pending_streams_.erase(--(it.base()));
+      it = pending_streams_.rbegin();
+    }
+    if (it == pending_streams_.rend()) {
+      return;
+    }
+    if (OpenDataStream(it->second) == nullptr) {
+      return;
+    }
     pending_streams_.erase(--(it.base()));
-    it = pending_streams_.rbegin();
-  }
-  if (it == pending_streams_.rend()) {
-    return;
-  }
-  if (OpenDataStream(it->second) == nullptr) {
-    return;
-  }
-  pending_streams_.erase(--(it.base()));
-  if (!pending_streams_.empty()) {
-    visitor_->UpdateTrackPriority(
-        request_id_, std::nullopt,
-        MoqtTrackPriority{
-            subscriber_priority(),
-            pending_streams_.rbegin()->second.publisher_priority.value_or(
-                default_publisher_priority())});
+    if (!pending_streams_.empty()) {
+      visitor_->UpdateTrackPriority(
+          request_id_, std::nullopt,
+          MoqtTrackPriority{
+              subscriber_priority(),
+              pending_streams_.rbegin()->second.publisher_priority.value_or(
+                  default_publisher_priority())});
+    }
   }
 }
 
diff --git a/quiche/quic/moqt/moqt_subscription.h b/quiche/quic/moqt/moqt_subscription.h
index 37dee22..38a314a 100644
--- a/quiche/quic/moqt/moqt_subscription.h
+++ b/quiche/quic/moqt/moqt_subscription.h
@@ -35,7 +35,7 @@
 
 namespace test {
 class SubscriptionPublisherPeer;
-}
+}  // namespace test
 
 // This is the part of the send order useful for ranking streams within the
 // subscription. It sets the subscriber_priority to kDefaultSubscriberPriority
diff --git a/quiche/quic/moqt/moqt_subscription_test.cc b/quiche/quic/moqt/moqt_subscription_test.cc
index 520e709..89045f3 100644
--- a/quiche/quic/moqt/moqt_subscription_test.cc
+++ b/quiche/quic/moqt/moqt_subscription_test.cc
@@ -11,7 +11,9 @@
 #include <string>
 #include <utility>
 
+#include "absl/base/casts.h"
 #include "absl/base/nullability.h"
+#include "absl/container/flat_hash_set.h"
 #include "absl/status/status.h"
 #include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
@@ -31,6 +33,7 @@
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/moqt_uni_stream.h"
 #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
 #include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
@@ -43,6 +46,21 @@
 
 namespace moqt::test {
 
+class SubscriptionPublisherPeer {
+ public:
+  static size_t num_open_streams(SubscriptionPublisher* publisher) {
+    return publisher->stream_map_.GetAllStreams().size();
+  }
+  static std::optional<Location> largest_sent(
+      const SubscriptionPublisher* publisher) {
+    return publisher->largest_sent_;
+  }
+  static const absl::flat_hash_set<DataStreamIndex>& reset_subgroups(
+      const SubscriptionPublisher* publisher) {
+    return publisher->reset_subgroups_;
+  }
+};
+
 namespace {
 
 using ::testing::_;
@@ -111,7 +129,7 @@
     parameters_.group_order = MoqtDeliveryOrder::kAscending;
     EXPECT_CALL(monitoring_interface_, OnObjectAckSupportKnown)
         .Times(AtLeast(0));
-    ON_CALL(visitor_, session).WillByDefault(Return(&webtrans_));
+    EXPECT_CALL(visitor_, session).WillRepeatedly(Return(&webtrans_));
     publisher_ = std::make_unique<SubscriptionPublisher>(
         framer_, track_publisher_, &bidi_stream_, kRequestId, kTrackAlias,
         parameters_, &visitor_, &monitoring_interface_, &mock_clock_,
@@ -119,6 +137,7 @@
     ON_CALL(visitor_, alternate_delivery_timeout).WillByDefault(Return(false));
     ON_CALL(webtrans_, GetStreamById(kStreamId))
         .WillByDefault(Return(&mock_uni_stream_));
+    ON_CALL(visitor_, alarm_factory).WillByDefault(Return(&alarm_factory_));
   }
 
   ~SubscriptionPublisherTest() override {
@@ -164,7 +183,7 @@
           uni_stream_ = std::move(visitor);
         });
     EXPECT_CALL(mock_uni_stream_, SetPriority);
-    EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+    ON_CALL(mock_uni_stream_, visitor()).WillByDefault([&]() {
       return uni_stream_.get();
     });
     EXPECT_CALL(mock_uni_stream_, CanWrite()).WillRepeatedly(Return(true));
@@ -192,7 +211,7 @@
     EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
         .WillOnce(Return(false));
     EXPECT_CALL(visitor_,
-                UpdateTrackPriority(1, std::optional<MoqtTrackPriority>(),
+                UpdateTrackPriority(1, _,
                                     MoqtTrackPriority{subscriber_priority(),
                                                       publisher_priority}));
     publisher_->OnNewObjectAvailable(location, subgroup, publisher_priority);
@@ -206,7 +225,7 @@
   MoqtControlMessageParser message_parser_{kDefaultMoqtVersion, true};
   webtransport::test::MockSession webtrans_;
   StrictMock<webtransport::test::MockStream> mock_bidi_stream_;
-  StrictMock<webtransport::test::MockStream> mock_uni_stream_;
+  webtransport::test::MockStream mock_uni_stream_;
   std::shared_ptr<MockTrackPublisher> track_publisher_;
   TestMoqtBidiStream bidi_stream_;
   std::unique_ptr<webtransport::StreamVisitor> uni_stream_;
@@ -218,7 +237,7 @@
   const TrackExtensions extensions_;
   quic::MockClock mock_clock_;
   MoqtSessionCallbacks callbacks_;
-  quic::test::MockAlarmFactory alarm_factory_;
+  quic::test::TestAlarmFactory alarm_factory_;
   int open_streams_ = 0;
 };
 
@@ -395,7 +414,7 @@
   publisher_->OnNewFinAvailable(Location(1, 0), 0);
 }
 
-TEST_F(SubscriptionPublisherTest, OnSubgroupAbandoned) {
+TEST_F(SubscriptionPublisherTest, OnSubgroupAbandonedNoEffect) {
   // Not in window
   MessageParameters params;
   params.subscription_filter = SubscriptionFilter(Location(10, 0), 10);
@@ -426,14 +445,16 @@
 }
 
 TEST_F(SubscriptionPublisherTest, OnGroupAbandonedWithStreams) {
-  // Create a stream
+  // The delivery timeout is not infinite, so it will not send a PUBLISH_DONE
+  // with kTooFarBehind.
   CreateStream(Location(1, 0), 0, 128);
   EXPECT_CALL(mock_uni_stream_, ResetWithUserCode);
+  EXPECT_CALL(mock_bidi_stream_, Writev).Times(0);  // No PUBLISH_DONE.
   publisher_->OnGroupAbandoned(1);
 }
 
 TEST_F(SubscriptionPublisherTest, OnGroupAbandonedTooFarBehind) {
-  // Create a pending stream
+  // Set the delivery timeout to infinite so that TooFarBehind is possible.
   parameters_.delivery_timeout = quic::QuicTimeDelta::Infinite();
   publisher_->Update(parameters_);
   CreateStream(Location(5, 0), 0, 128);
@@ -455,7 +476,8 @@
   // Abandon the group.
   publisher_->OnGroupAbandoned(1);
   // OnCanCreateNewUniStream should clean it up; no attempt to create a stream.
-  EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream()).Times(0);
+  EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream)
+      .WillOnce(Return(true));
   EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream).Times(0);
   publisher_->OnCanCreateNewUniStream();
 }
@@ -501,7 +523,7 @@
   EXPECT_CALL(mock_uni_stream_, GetStreamId())
       .WillRepeatedly(Return(kStreamId));
   EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
+      .WillRepeatedly(Return(true));
   EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
       .WillOnce(Return(&mock_uni_stream_));
   EXPECT_CALL(mock_uni_stream_, SetVisitor)
@@ -523,6 +545,94 @@
   publisher_->OnCanCreateNewUniStream();
 }
 
+TEST_F(SubscriptionPublisherTest, PendingStreamsInOrder) {
+  CreatePendingStream(Location(1, 0), 0, 128);
+  CreatePendingStream(Location(0, 0), 0, 128);
+  CreatePendingStream(Location(2, 0), 0, 127);
+  // Should be opened in the order (2, 0), (0, 0), (1, 0),
+  // Open stream and send (2, 0).
+  EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+      .WillOnce(Return(&mock_uni_stream_));
+  EXPECT_CALL(mock_uni_stream_, GetStreamId).WillRepeatedly(Return(kStreamId));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor2;
+  EXPECT_CALL(mock_uni_stream_, SetVisitor)
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor2 = std::move(visitor);
+      });
+  EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+    return stream_visitor2.get();
+  });
+  EXPECT_CALL(mock_uni_stream_, SetPriority);
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(2, std::optional<uint64_t>(0), 0, 0))
+      .WillOnce(Return(DefaultPublishedObject(Location(2, 0), 0, 127)));
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(2, std::optional<uint64_t>(0), 1, 0))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(visitor_, UpdateTrackPriority);
+  publisher_->OnCanCreateNewUniStream();
+  // Open (0, 0)
+  EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true))
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+      .WillOnce(Return(&mock_uni_stream_));
+  EXPECT_CALL(mock_uni_stream_, GetStreamId)
+      .WillRepeatedly(Return(kStreamId + 4));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
+  EXPECT_CALL(mock_uni_stream_, SetVisitor)
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor0 = std::move(visitor);
+      });
+  EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+    return stream_visitor0.get();
+  });
+  EXPECT_CALL(mock_uni_stream_, SetPriority);
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(0, std::optional<uint64_t>(0), 0, 0))
+      .WillOnce(Return(DefaultPublishedObject(Location(0, 0), 0, 128)));
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(0, std::optional<uint64_t>(0), 1, 0))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(visitor_, UpdateTrackPriority);
+  publisher_->OnCanCreateNewUniStream();
+  // Open (1, 0)
+  EXPECT_CALL(webtrans_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillRepeatedly(Return(true));  // Unlimited credit but only one stream.
+  EXPECT_CALL(webtrans_, OpenOutgoingUnidirectionalStream)
+      .WillOnce(Return(&mock_uni_stream_));
+  EXPECT_CALL(mock_uni_stream_, GetStreamId)
+      .WillRepeatedly(Return(kStreamId + 8));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
+  EXPECT_CALL(mock_uni_stream_, SetVisitor)
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor1 = std::move(visitor);
+      });
+  EXPECT_CALL(mock_uni_stream_, visitor()).WillRepeatedly([&]() {
+    return stream_visitor1.get();
+  });
+  EXPECT_CALL(mock_uni_stream_, SetPriority);
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(1, std::optional<uint64_t>(0), 0, 0))
+      .WillOnce(Return(DefaultPublishedObject(Location(1, 0), 0, 128)));
+  EXPECT_CALL(*track_publisher_,
+              GetCachedObject(1, std::optional<uint64_t>(0), 1, 0))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_uni_stream_, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_uni_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(visitor_, UpdateTrackPriority).Times(0);
+  publisher_->OnCanCreateNewUniStream();
+}
+
 TEST_F(SubscriptionPublisherTest, OnDataStreamDestroyed) {
   CreateStream(Location(1, 0), 0, 128);
   DataStreamIndex index(1, 0);
@@ -541,6 +651,51 @@
           Location(1, 0));
 }
 
+TEST_F(SubscriptionPublisherTest, AlternateDeliveryTimeout) {
+  EXPECT_CALL(visitor_, alternate_delivery_timeout)
+      .WillRepeatedly(Return(true));
+  CreateStream(Location(0, 0), 0, 128);
+  // Save the visitor before it's overwritten.
+  std::unique_ptr<webtransport::StreamVisitor> uni_stream =
+      std::move(uni_stream_);
+  CreateStream(Location(0, 1), 1, 200);
+  std::unique_ptr<webtransport::StreamVisitor> uni_stream1 =
+      std::move(uni_stream_);
+  // Timers aren't running.
+  EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+                absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
+            nullptr);
+  EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+                absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
+            nullptr);
+  // Second group starts the timer.
+  EXPECT_CALL(mock_uni_stream_, visitor)
+      .WillOnce(Return(uni_stream.get()))
+      .WillOnce(Return(uni_stream1.get()))
+      .WillRepeatedly([&]() { return uni_stream_.get(); });
+  CreateStream(Location(1, 0), 0, 128);
+  // Group 0 streams now have a timer running.
+  EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
+                absl::down_cast<OutgoingSubgroupStream*>(uni_stream.get())),
+            nullptr);
+  EXPECT_NE(OutgoingSubgroupStreamPeer::GetAlarm(
+                absl::down_cast<OutgoingSubgroupStream*>(uni_stream1.get())),
+            nullptr);
+  // No timer on group 1.
+  EXPECT_EQ(OutgoingSubgroupStreamPeer::GetAlarm(
+                absl::down_cast<OutgoingSubgroupStream*>(uni_stream_.get())),
+            nullptr);
+}
+
+TEST_F(SubscriptionPublisherTest, IncomingUpdateTruncatesSubscription) {
+  // Track gets to Group 5.
+  CreateStream(Location(5, 0), 0, 128);
+  parameters_.subscription_filter = SubscriptionFilter(Location(0, 0), 4);
+  publisher_->Update(parameters_);
+  EXPECT_CALL(*track_publisher_, GetCachedObject).Times(0);
+  publisher_->OnNewObjectAvailable(Location(5, 1), 0, 128);
+}
+
 }  // namespace
 
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h
index c2921c3..fd0d7d8 100644
--- a/quiche/quic/moqt/moqt_uni_stream.h
+++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -30,7 +30,7 @@
 namespace moqt {
 
 namespace test {
-class MoqtSessionPeer;
+class OutgoingSubgroupStreamPeer;
 }
 
 // A base class for locally initiated unidirectional streams, which can serve
@@ -140,7 +140,7 @@
 
  private:
   friend class DeliveryTimeoutDelegate;
-  friend class test::MoqtSessionPeer;
+  friend class test::OutgoingSubgroupStreamPeer;
 
   // Sends objects on the stream, starting with `next_object_`, until the
   // stream becomes write-blocked or closed. Can reset the stream, destroying
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc
index 7a9cc11..c021460 100644
--- a/quiche/quic/moqt/moqt_uni_stream_test.cc
+++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -24,6 +24,7 @@
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
+#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/quic/test_tools/mock_clock.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
@@ -93,8 +94,10 @@
     EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
   }
 
-  void CreateStream(uint64_t next_object = 0) {
+  void CreateStream(uint64_t next_object = 0) { CreateStream(0, next_object); }
+  void CreateStream(uint64_t subgroup, uint64_t next_object) {
     EXPECT_CALL(mock_stream_, SetPriority);
+    index_ = DataStreamIndex(0, subgroup);
     stream_ = std::make_unique<OutgoingSubgroupStream>(
         framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(),
         track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_);
@@ -236,6 +239,9 @@
   EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
   ExpectAlarm();
   stream_->OnCanWrite();
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+  EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+  alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
 }
 
 TEST_F(OutgoingSubgroupStreamTest, Fin) {
@@ -249,8 +255,19 @@
   EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
   ExpectAlarm();
   stream_->Fin(Location(0, 0));
-  // last_object.object >= next_object: does nothing
-  stream_->Fin(Location(0, 1));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+  EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+  alarm_factory_.FireAlarm(OutgoingSubgroupStreamPeer::GetAlarm(stream_.get()));
+}
+
+TEST_F(OutgoingSubgroupStreamTest, FinForFutureObject) {
+  // Delivery is blocked.
+  EXPECT_CALL(mock_stream_, CanWrite).WillOnce(Return(false));
+  stream_->OnCanWrite();
+  // FIN does nothing because last object hasn't been sent. Rely on the cache
+  // to set object.fin_after_this.
+  EXPECT_CALL(mock_stream_, Writev).Times(0);
+  stream_->Fin(Location(0, 0));
 }
 
 TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) {
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index 4b87f5e..d106fb2 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -147,7 +147,8 @@
         absl::UnimplementedError("Fetch not implemented"));
   }
   void AddObject(Location location, uint64_t subgroup,
-                 absl::string_view payload, bool fin) {
+                 absl::string_view payload, bool fin,
+                 quic::QuicTime arrival_time = quic::QuicTime::Zero()) {
     PublishedObjectMetadata metadata;
     metadata.location = location;
     metadata.subgroup = subgroup;
@@ -155,6 +156,7 @@
     metadata.status = MoqtObjectStatus::kNormal;
     metadata.publisher_priority = 128;
     metadata.payload_length = payload.length();
+    metadata.arrival_time = arrival_time;
     auto it = objects_.find(location);
     if (it != objects_.end()) {
       it->second.Append(it->second.payload_received(), payload);
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index b3c9e86..a4ba325 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -52,25 +52,6 @@
   }
 };
 
-// TODO(martinduke): When subscription-specific tests are removed from,
-// MoqtSessionTest, much of this file can be deleted (including
-// SubscriptionPublisherPeer).
-
-class SubscriptionPublisherPeer {
- public:
-  static size_t num_open_streams(SubscriptionPublisher* publisher) {
-    return publisher->stream_map_.GetAllStreams().size();
-  }
-  static std::optional<Location> largest_sent(
-      const SubscriptionPublisher* publisher) {
-    return publisher->largest_sent_;
-  }
-  static const absl::flat_hash_set<DataStreamIndex>& reset_subgroups(
-      const SubscriptionPublisher* publisher) {
-    return publisher->reset_subgroups_;
-  }
-};
-
 // Helper class to interact with MOQT bidi streams in tests.
 class MoqtBidiStreamTestWrapper {
  public:
@@ -97,6 +78,13 @@
   std::unique_ptr<MoqtBidiStreamBase> absl_nonnull stream_;
 };
 
+class OutgoingSubgroupStreamPeer {
+ public:
+  static quic::QuicAlarm* GetAlarm(OutgoingSubgroupStream* stream) {
+    return stream->delivery_timeout_alarm_.get();
+  }
+};
+
 class MoqtSessionPeer {
  public:
   static constexpr webtransport::StreamId kControlStreamId = 4;
@@ -129,6 +117,11 @@
     return new_stream;
   }
 
+  static bool RequestIdIsSubscriptionPublisher(MoqtSession* session,
+                                               uint64_t request_id) {
+    return session->published_subscriptions_.contains(request_id);
+  }
+
   // In the test OnSessionReady, the session creates a stream and then passes
   // its unique_ptr to the mock webtransport stream. This function casts
   // that unique_ptr into a MoqtSession::Stream*, which is a private class of
@@ -158,59 +151,6 @@
                                          std::move(track));
   }
 
-  static MoqtObjectListener* AddSubscription(
-      MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> publisher,
-      uint64_t subscribe_id, uint64_t track_alias, uint64_t start_group,
-      uint64_t start_object) {
-    MessageParameters parameters;
-    parameters.subscription_filter.emplace(Location(start_group, start_object));
-    MoqtSubscribe subscribe(subscribe_id, publisher->GetTrackName(),
-                            parameters);
-    subscribe.parameters.set_forward(true);
-    subscribe.parameters.subscription_filter.emplace(
-        Location(start_group, start_object));
-    subscribe.parameters.subscriber_priority = 0x80;
-    subscribe.parameters.group_order = MoqtDeliveryOrder::kAscending;
-    session->published_subscriptions_.emplace(
-        subscribe_id,
-        std::make_unique<SubscriptionPublisher>(
-            session->framer_, std::move(publisher), session->GetControlStream(),
-            subscribe_id, track_alias, subscribe.parameters, session,
-            /*monitoring_interface=*/nullptr, session->callbacks_.clock,
-            session->trace_recorder_));
-    return session->published_subscriptions_[subscribe_id].get();
-  }
-
-  static bool InSubscriptionWindow(MoqtObjectListener* subscription,
-                                   Location sequence) {
-    std::optional<SubscriptionFilter> filter =
-        absl::down_cast<SubscriptionPublisher*>(subscription)
-            ->parameters()
-            .subscription_filter;
-    return (!filter.has_value() || filter->InWindow(sequence));
-  }
-
-  static MoqtObjectListener* GetSubscription(MoqtSession* session,
-                                             uint64_t subscribe_id) {
-    auto it = session->published_subscriptions_.find(subscribe_id);
-    if (it == session->published_subscriptions_.end()) {
-      return nullptr;
-    }
-    return it->second.get();
-  }
-
-  static void DeleteSubscription(MoqtSession* session, uint64_t subscribe_id) {
-    session->published_subscriptions_.erase(subscribe_id);
-  }
-
-  static void UpdateSubscriberPriority(MoqtSession* session,
-                                       uint64_t subscribe_id,
-                                       MoqtPriority priority) {
-    MessageParameters parameters;
-    parameters.subscriber_priority = priority;
-    session->published_subscriptions_[subscribe_id]->Update(parameters);
-  }
-
   static SubscribeRemoteTrack* remote_track(MoqtSession* session,
                                             uint64_t track_alias) {
     return session->RemoteTrackByAlias(track_alias);
@@ -237,12 +177,6 @@
     session->ValidateRequestId(id);
   }
 
-  static std::optional<Location> LargestSentForSubscription(
-      MoqtSession* session, uint64_t subscribe_id) {
-    return SubscriptionPublisherPeer::largest_sent(
-        session->published_subscriptions_[subscribe_id].get());
-  }
-
   // Adds an upstream fetch and a stream ready to receive data.
   static std::unique_ptr<MoqtFetchTask> CreateUpstreamFetch(
       MoqtSession* session, webtransport::Stream* stream) {
@@ -285,11 +219,6 @@
     return session->callbacks_.clock->ApproximateNow();
   }
 
-  static quic::QuicAlarm* GetAlarm(webtransport::StreamVisitor* visitor) {
-    return absl::down_cast<OutgoingSubgroupStream*>(visitor)
-        ->delivery_timeout_alarm_.get();
-  }
-
   static quic::QuicAlarm* GetPublishDoneAlarm(
       SubscribeRemoteTrack* subscription) {
     return subscription->publish_done_alarm_.get();
@@ -299,23 +228,13 @@
     return session->goaway_timeout_alarm_.get();
   }
 
-  static quic::QuicTimeDelta GetDeliveryTimeout(
-      MoqtObjectListener* subscription) {
-    return absl::down_cast<SubscriptionPublisher*>(subscription)
-        ->delivery_timeout();
-  }
-  static void SetDeliveryTimeout(MoqtObjectListener* subscription,
-                                 quic::QuicTimeDelta timeout) {
-    absl::down_cast<SubscriptionPublisher*>(subscription)
-        ->parameters()
-        .delivery_timeout = timeout;
-  }
-
-  static bool SubgroupHasBeenReset(MoqtObjectListener* subscription,
-                                   DataStreamIndex index) {
-    return SubscriptionPublisherPeer::reset_subgroups(
-               absl::down_cast<SubscriptionPublisher*>(subscription))
-        .contains(index);
+  static quic::QuicTimeDelta GetDeliveryTimeout(MoqtSession* session,
+                                                uint64_t request_id) {
+    auto it = session->published_subscriptions_.find(request_id);
+    if (it == session->published_subscriptions_.end()) {
+      return quic::QuicTimeDelta::Zero();
+    }
+    return it->second->delivery_timeout();
   }
 
   static absl::string_view GetImplementationString(MoqtSession* session) {
@@ -329,6 +248,13 @@
   static const MoqtSessionParameters& GetParameters(MoqtSession* session) {
     return session->parameters_;
   }
+
+  static std::optional<uint64_t> NextQueuedRequestIdToServer(
+      MoqtSession* session) {
+    return session->subscriptions_with_queued_streams_.empty()
+               ? std::optional<uint64_t>()
+               : session->subscriptions_with_queued_streams_.begin()->second;
+  }
 };
 
 }  // namespace moqt::test