Make response to incoming MoQT SUBSCRIBE asynchronous. Other verbs to come.

MoqtPublisher::GetTrack() can return with non-OK status, in which case MoqtSession can immediately reply with SUBSCRIBE_ERROR. Otherwise, MoqtSession will get an MoqtTrackPublisher (which might be provisional) and registers an MoqtObjectListener with it.

MoqtTrackPublisher will later call SubscribeAccepted() or SubscribeRejected() based on the result of the upstream SUBSCRIBE. The current queues have no upstream capability and immediately call SubscribeAccepted().

Since all downstream SUBSCRIBE tests in MoqtSessionTest needed revising anyway, created some helper functions to reduce toil and increase readibility.

PiperOrigin-RevId: 737004315
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 71e59a0..7707755 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -268,7 +268,6 @@
   MoqtKnownTrackPublisher known_track_publisher;
   known_track_publisher.Add(queue);
   client_->session()->set_publisher(&known_track_publisher);
-  queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
   bool received_subscribe_ok = false;
   EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() {
     received_subscribe_ok = true;
@@ -276,7 +275,12 @@
   client_->session()->Announce(
       FullTrackName{"test"},
       [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {});
+  bool success = test_harness_.RunUntilWithDefaultTimeout(
+      [&]() { return received_subscribe_ok; });
+  EXPECT_TRUE(success);
+  success = false;
 
+  queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
   bool received_object = false;
   EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
@@ -291,9 +295,8 @@
         EXPECT_TRUE(end_of_message);
         received_object = true;
       });
-  bool success = test_harness_.RunUntilWithDefaultTimeout(
+  success = test_harness_.RunUntilWithDefaultTimeout(
       [&]() { return received_object; });
-  EXPECT_TRUE(received_subscribe_ok);
   EXPECT_TRUE(success);
 }
 
@@ -321,6 +324,7 @@
     client_->session()->SubscribeCurrentGroup(FullTrackName("test", name),
                                               &client_visitor);
     int received = 0;
+    EXPECT_CALL(client_visitor, OnReply);
     EXPECT_CALL(client_visitor,
                 OnObjectFragment(_, FullSequence{1, 0}, _,
                                  MoqtObjectStatus::kNormal, "object 4", true))
@@ -443,6 +447,10 @@
   MockSubscribeRemoteTrackVisitor client_visitor;
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
+  EXPECT_CALL(*track_publisher, AddObjectListener)
+      .WillOnce([&](MoqtObjectListener* listener) {
+        listener->OnSubscribeAccepted();
+      });
   EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
       .WillOnce([&]() { received_ok = true; });
   client_->session()->SubscribeAbsolute(full_track_name, 0, 0, &client_visitor);
@@ -464,6 +472,10 @@
   MockSubscribeRemoteTrackVisitor client_visitor;
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
+  EXPECT_CALL(*track_publisher, AddObjectListener)
+      .WillOnce([&](MoqtObjectListener* listener) {
+        listener->OnSubscribeAccepted();
+      });
   EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
       .WillOnce([&]() { received_ok = true; });
   client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor);
@@ -485,6 +497,10 @@
   MockSubscribeRemoteTrackVisitor client_visitor;
   std::optional<absl::string_view> expected_reason = std::nullopt;
   bool received_ok = false;
+  EXPECT_CALL(*track_publisher, AddObjectListener)
+      .WillOnce([&](MoqtObjectListener* listener) {
+        listener->OnSubscribeAccepted();
+      });
   EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
       .WillOnce([&]() { received_ok = true; });
   client_->session()->SubscribeCurrentGroup(full_track_name, &client_visitor);
@@ -573,6 +589,10 @@
       .WillOnce([&](MoqtObjectAckFunction new_ack_function) {
         ack_function = std::move(new_ack_function);
       });
+  EXPECT_CALL(*track_publisher, AddObjectListener)
+      .WillOnce([&](MoqtObjectListener* listener) {
+        listener->OnSubscribeAccepted();
+      });
   EXPECT_CALL(client_visitor, OnReply(_, _, _))
       .WillOnce([&](const FullTrackName&, std::optional<FullSequence>,
                     std::optional<absl::string_view>) {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 2b19069..cea9470 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -87,6 +87,7 @@
       FullSequence start, FullSequence end) const override;
   void AddObjectListener(MoqtObjectListener* listener) override {
     listeners_.insert(listener);
+    listener->OnSubscribeAccepted();
   }
   void RemoveObjectListener(MoqtObjectListener* listener) override {
     listeners_.erase(listener);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index 3d3a0f0..02bd47e 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -90,6 +90,11 @@
   MOCK_METHOD(void, SkipGroup, (uint64_t group_id), ());
   MOCK_METHOD(void, CloseTrack, (), ());
   MOCK_METHOD(void, OnTrackPublisherGone, (), (override));
+  MOCK_METHOD(void, OnSubscribeAccepted, (), (override));
+  MOCK_METHOD(void, OnSubscribeRejected,
+              (MoqtSubscribeErrorReason reason,
+               std::optional<uint64_t> track_alias),
+              (override));
 };
 
 // Duplicates of MoqtOutgoingQueue test cases.
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 174e2f5..9715563 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -60,6 +60,7 @@
       FullSequence start, FullSequence end) const override;
   void AddObjectListener(MoqtObjectListener* listener) override {
     listeners_.insert(listener);
+    listener->OnSubscribeAccepted();
   }
   void RemoveObjectListener(MoqtObjectListener* listener) override {
     listeners_.erase(listener);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 418a022..81a66dd 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -80,6 +80,11 @@
                absl::string_view payload),
               ());
   MOCK_METHOD(void, OnTrackPublisherGone, (), (override));
+  MOCK_METHOD(void, OnSubscribeAccepted, (), (override));
+  MOCK_METHOD(void, OnSubscribeRejected,
+              (MoqtSubscribeErrorReason reason,
+               std::optional<uint64_t> track_alias),
+              (override));
 };
 
 absl::StatusOr<std::vector<std::string>> FetchToVector(
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index c624ba2..c90c52a 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -38,6 +38,16 @@
  public:
   virtual ~MoqtObjectListener() = default;
 
+  // Called when the publisher is sure that it can serve the subscription. This
+  // could happen synchronously or asynchronously.Details necessary for the
+  // SUBSCRIBE_OK can be obtained from the MoqtTrackPublisher.
+  virtual void OnSubscribeAccepted() = 0;
+  // Called when the publisher is sure that it cannot serve the subscription.
+  // This could happen synchronously or asynchronously.
+  virtual void OnSubscribeRejected(
+      MoqtSubscribeErrorReason reason,
+      std::optional<uint64_t> track_alias = std::nullopt) = 0;
+
   // Notifies that an object with the given sequence number has become
   // available.  The object payload itself may be retrieved via GetCachedObject
   // method of the associated track publisher.
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 7b1f300..9acedb2 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -68,36 +68,10 @@
   return status.ok() && DoesTrackStatusImplyHavingData(*status);
 }
 
-SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe,
-                                         MoqtTrackPublisher& publisher) {
-  const FullSequence sequence = PublisherHasData(publisher)
-                                    ? publisher.GetLargestSequence()
-                                    : FullSequence{0, 0};
-  uint64_t start_group = sequence.group;
-  uint64_t start_object =
-      sequence.object + (PublisherHasData(publisher) ? 1 : 0);
-  if (subscribe.start_group.has_value() &&
-      *subscribe.start_group >= start_group) {
-    QUICHE_DCHECK(subscribe.start_object.has_value());
-    start_group = *subscribe.start_group;
-    start_object = subscribe.start_object.value_or(0);
-  }
-  switch (GetFilterType(subscribe)) {
-    case MoqtFilterType::kLatestGroup:
-      return SubscribeWindow(start_group, 0);
-    case MoqtFilterType::kLatestObject:
-    case MoqtFilterType::kAbsoluteStart:
-      return SubscribeWindow(start_group, start_object);
-    case MoqtFilterType::kAbsoluteRange:
-      // If end_group has no value, the filter cannot be AbsoluteRange.
-      QUICHE_DCHECK(subscribe.end_group.has_value());
-      return SubscribeWindow(start_group, start_object,
-                             subscribe.end_group.value_or(UINT64_MAX),
-                             UINT64_MAX);
-    case MoqtFilterType::kNone:
-      QUICHE_BUG(MoqtSession_Subscription_invalid_filter_passed);
-      return SubscribeWindow(0, 0);
-  }
+SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) {
+  return SubscribeWindow(subscribe.start_group.value_or(0),
+                         subscribe.start_object.value_or(0),
+                         subscribe.end_group.value_or(UINT64_MAX), UINT64_MAX);
 }
 
 class DefaultPublisher : public MoqtPublisher {
@@ -955,10 +929,10 @@
 }
 
 void MoqtSession::ControlStream::SendSubscribeError(
-    const MoqtSubscribe& message, SubscribeErrorCode error_code,
+    uint64_t subscribe_id, SubscribeErrorCode error_code,
     absl::string_view reason_phrase, uint64_t track_alias) {
   MoqtSubscribeError subscribe_error;
-  subscribe_error.subscribe_id = message.subscribe_id;
+  subscribe_error.subscribe_id = subscribe_id;
   subscribe_error.error_code = error_code;
   subscribe_error.reason_phrase = reason_phrase;
   subscribe_error.track_alias = track_alias;
@@ -983,7 +957,17 @@
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.full_track_name;
-
+  if (session_->sent_goaway_) {
+    QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY";
+    SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kUnauthorized,
+                       "SUBSCRIBE after GOAWAY", message.track_alias);
+    return;
+  }
+  if (session_->subscribed_track_names_.contains(message.full_track_name)) {
+    session_->Error(MoqtError::kProtocolViolation,
+                    "Duplicate subscribe for track");
+    return;
+  }
   const FullTrackName& track_name = message.full_track_name;
   absl::StatusOr<std::shared_ptr<MoqtTrackPublisher>> track_publisher =
       session_->publisher_->GetTrack(track_name);
@@ -991,27 +975,10 @@
     QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
                     << " rejected by the application: "
                     << track_publisher.status();
-    SendSubscribeError(message, SubscribeErrorCode::kDoesNotExist,
+    SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kDoesNotExist,
                        track_publisher.status().message(), message.track_alias);
     return;
   }
-  std::optional<FullSequence> largest_id;
-  if (PublisherHasData(**track_publisher)) {
-    largest_id = (*track_publisher)->GetLargestSequence();
-  }
-  if (message.end_group.has_value() && largest_id.has_value() &&
-      *message.end_group < largest_id->group) {
-    SendSubscribeError(message, SubscribeErrorCode::kInvalidRange,
-                       "SUBSCRIBE ends in previous group", message.track_alias);
-    return;
-  }
-  if (session_->sent_goaway_) {
-    QUIC_DLOG(INFO) << ENDPOINT << "Received a FETCH after GOAWAY";
-    SendSubscribeError(message, SubscribeErrorCode::kUnauthorized,
-                       "SUBSCRIBE after GOAWAY", message.track_alias);
-    return;
-  }
-  MoqtDeliveryOrder delivery_order = (*track_publisher)->GetDeliveryOrder();
 
   MoqtPublishingMonitorInterface* monitoring = nullptr;
   auto monitoring_it =
@@ -1022,35 +989,19 @@
     session_->monitoring_interfaces_for_published_tracks_.erase(monitoring_it);
   }
 
-  if (session_->subscribed_track_names_.contains(track_name)) {
-    session_->Error(MoqtError::kProtocolViolation,
-                    "Duplicate subscribe for track");
-    return;
-  }
+  MoqtTrackPublisher* track_publisher_ptr = track_publisher->get();
   auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
       session_, *std::move(track_publisher), message, monitoring);
   subscription->set_delivery_timeout(
       message.parameters.delivery_timeout.value_or(
           quic::QuicTimeDelta::Infinite()));
+  MoqtSession::PublishedSubscription* subscription_ptr = subscription.get();
   auto [it, success] = session_->published_subscriptions_.emplace(
       message.subscribe_id, std::move(subscription));
   if (!success) {
-    SendSubscribeError(message, SubscribeErrorCode::kInternalError,
-                       "Duplicate subscribe ID", message.track_alias);
-    return;
+    QUICHE_NOTREACHED();  // ValidateSubscribeId() should have caught this.
   }
-
-  MoqtSubscribeOk subscribe_ok;
-  subscribe_ok.subscribe_id = message.subscribe_id;
-  subscribe_ok.group_order = delivery_order;
-  subscribe_ok.largest_id = largest_id;
-  // TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the
-  // publisher.
-  SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
-
-  if (largest_id.has_value()) {
-    it->second->Backfill();
-  }
+  track_publisher_ptr->AddObjectListener(subscription_ptr);
 }
 
 void MoqtSession::ControlStream::OnSubscribeOkMessage(
@@ -1770,11 +1721,10 @@
       session_(session),
       track_publisher_(track_publisher),
       track_alias_(subscribe.track_alias),
-      window_(SubscribeMessageToWindow(subscribe, *track_publisher)),
+      window_(SubscribeMessageToWindow(subscribe)),
       subscriber_priority_(subscribe.subscriber_priority),
       subscriber_delivery_order_(subscribe.group_order),
       monitoring_interface_(monitoring_interface) {
-  track_publisher->AddObjectListener(this);
   if (monitoring_interface_ != nullptr) {
     monitoring_interface_->OnObjectAckSupportKnown(
         subscribe.parameters.object_ack_window.has_value());
@@ -1830,6 +1780,48 @@
                                   FinalizeSendOrder(old_send_order));
 };
 
+void MoqtSession::PublishedSubscription::OnSubscribeAccepted() {
+  std::optional<FullSequence> largest_id;
+  ControlStream* stream = session_->GetControlStream();
+  if (PublisherHasData(*track_publisher_)) {
+    largest_id = track_publisher_->GetLargestSequence();
+    if (window_.end().has_value() && *window_.end() < *largest_id) {
+      stream->SendSubscribeError(subscription_id_,
+                                 SubscribeErrorCode::kInvalidRange,
+                                 "SUBSCRIBE ends in past group", track_alias_);
+      session_->published_subscriptions_.erase(subscription_id_);
+      // No class access below this line!
+      return;
+    }
+    if (filter_type_ == MoqtFilterType::kLatestGroup) {
+      window_.UpdateStartEnd(FullSequence{largest_id->group, 0}, window_.end());
+    } else {
+      window_.UpdateStartEnd(largest_id->next(), window_.end());
+    }
+  }
+
+  MoqtSubscribeOk subscribe_ok;
+  subscribe_ok.subscribe_id = subscription_id_;
+  subscribe_ok.group_order = track_publisher_->GetDeliveryOrder();
+  subscribe_ok.largest_id = largest_id;
+  // TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the
+  // publisher.
+  stream->SendOrBufferMessage(
+      session_->framer_.SerializeSubscribeOk(subscribe_ok));
+  if (largest_id.has_value()) {
+    Backfill();
+  }
+}
+
+void MoqtSession::PublishedSubscription::OnSubscribeRejected(
+    MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias) {
+  session_->GetControlStream()->SendSubscribeError(
+      subscription_id_, reason.error_code, reason.reason_phrase,
+      track_alias.value_or(track_alias_));
+  session_->published_subscriptions_.erase(subscription_id_);
+  // No class access below this line!
+}
+
 void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
     FullSequence sequence) {
   if (!window_.InWindow(sequence)) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index a62e58e..fb578f8 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -331,12 +331,13 @@
     // control credit.
     void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);
 
-   private:
-    friend class test::MoqtSessionPeer;
-    void SendSubscribeError(const MoqtSubscribe& message,
+    void SendSubscribeError(uint64_t subscribe_id,
                             SubscribeErrorCode error_code,
                             absl::string_view reason_phrase,
                             uint64_t track_alias);
+
+   private:
+    friend class test::MoqtSessionPeer;
     void SendFetchError(uint64_t subscribe_id, SubscribeErrorCode error_code,
                         absl::string_view reason_phrase);
 
@@ -388,11 +389,10 @@
   // being sent to the peer.
   class PublishedSubscription : public MoqtObjectListener {
    public:
-    explicit PublishedSubscription(
-        MoqtSession* session,
-        std::shared_ptr<MoqtTrackPublisher> track_publisher,
-        const MoqtSubscribe& subscribe,
-        MoqtPublishingMonitorInterface* monitoring_interface);
+    PublishedSubscription(MoqtSession* session,
+                          std::shared_ptr<MoqtTrackPublisher> track_publisher,
+                          const MoqtSubscribe& subscribe,
+                          MoqtPublishingMonitorInterface* monitoring_interface);
     // TODO(martinduke): Immediately reset all the streams.
     ~PublishedSubscription();
 
@@ -411,6 +411,11 @@
     }
     void set_subscriber_priority(MoqtPriority priority);
 
+    // MoqtObjectListener implementation.
+    void OnSubscribeAccepted() override;
+    void OnSubscribeRejected(
+        MoqtSubscribeErrorReason reason,
+        std::optional<uint64_t> track_alias = std::nullopt) override;
     // This is only called for objects that have just arrived.
     void OnNewObjectAvailable(FullSequence sequence) override;
     void OnTrackPublisherGone() override;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index a89565d..975b31e 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -12,9 +12,9 @@
 #include <queue>
 #include <string>
 #include <utility>
-#include <vector>
 
 #include "absl/status/status.h"
+#include "absl/status/statusor.h"
 #include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
@@ -56,11 +56,13 @@
 constexpr webtransport::StreamId kIncomingUniStreamId = 15;
 constexpr webtransport::StreamId kOutgoingUniStreamId = 14;
 
+FullTrackName kDefaultTrackName() { return FullTrackName("foo", "bar"); }
+
 MoqtSubscribe DefaultSubscribe() {
   MoqtSubscribe subscribe = {
       /*subscribe_id=*/1,
       /*track_alias=*/2,
-      /*full_track_name=*/FullTrackName("foo", "bar"),
+      kDefaultTrackName(),
       /*subscriber_priority=*/0x80,
       /*group_order=*/std::nullopt,
       /*start_group=*/0,
@@ -77,7 +79,7 @@
       /*subscriber_priority=*/0x80,
       /*group_order=*/std::nullopt,
       /*joining_fetch=*/std::nullopt,
-      /*full_track_name=*/FullTrackName("foo", "bar"),
+      kDefaultTrackName(),
       /*start=*/FullSequence(0, 0),
       /*end_group=*/1,
       /*end_object=*/std::nullopt,
@@ -86,6 +88,8 @@
   return fetch;
 }
 
+// TODO(martinduke): Eliminate MoqtSessionPeer::AddSubscription, which allows
+// this to be removed as well.
 static std::shared_ptr<MockTrackPublisher> SetupPublisher(
     FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
     FullSequence largest_sequence) {
@@ -111,11 +115,60 @@
     session_.set_publisher(&publisher_);
     MoqtSessionPeer::set_peer_max_subscribe_id(&session_,
                                                kDefaultInitialMaxSubscribeId);
+    ON_CALL(mock_session_, GetStreamById).WillByDefault(Return(&mock_stream_));
   }
   ~MoqtSessionTest() {
     EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
   }
 
+  MockTrackPublisher* CreateTrackPublisher() {
+    auto publisher = std::make_shared<MockTrackPublisher>(kDefaultTrackName());
+    publisher_.Add(publisher);
+    ON_CALL(*publisher, GetTrackStatus())
+        .WillByDefault(Return(MoqtTrackStatusCode::kNotYetBegun));
+    ON_CALL(*publisher, GetForwardingPreference())
+        .WillByDefault(Return(MoqtForwardingPreference::kSubgroup));
+    ON_CALL(*publisher, GetDeliveryOrder)
+        .WillByDefault(Return(MoqtDeliveryOrder::kAscending));
+    return publisher.get();
+  }
+
+  void SetLargestId(MockTrackPublisher* publisher, FullSequence largest_id) {
+    ON_CALL(*publisher, GetTrackStatus())
+        .WillByDefault(Return(MoqtTrackStatusCode::kInProgress));
+    ON_CALL(*publisher, GetLargestSequence()).WillByDefault(Return(largest_id));
+  }
+
+  // The publisher receives SUBSCRIBE and synchronously announces it will
+  // publish objects.
+  MoqtObjectListener* ReceiveSubscribeSynchronousOk(
+      MockTrackPublisher* publisher, MoqtSubscribe& subscribe,
+      MoqtControlParserVisitor* control_parser) {
+    MoqtObjectListener* listener_ptr = nullptr;
+    EXPECT_CALL(*publisher, AddObjectListener)
+        .WillOnce([&](MoqtObjectListener* listener) {
+          listener_ptr = listener;
+          listener->OnSubscribeAccepted();
+        });
+    absl::StatusOr<MoqtTrackStatusCode> track_status =
+        publisher->GetTrackStatus();
+    if (!track_status.ok()) {
+      return nullptr;
+    }
+    MoqtSubscribeOk expected_ok = {
+        /*subscribe_id=*/subscribe.subscribe_id,
+        /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+        /*group_order=*/MoqtDeliveryOrder::kAscending,
+        (*track_status == MoqtTrackStatusCode::kInProgress)
+            ? std::make_optional(publisher->GetLargestSequence())
+            : std::optional<FullSequence>(),
+        /*parameters=*/MoqtSubscribeParameters(),
+    };
+    EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
+    control_parser->OnSubscribeMessage(subscribe);
+    return listener_ptr;
+  }
+
   // If visitor == nullptr, it's the first object in the stream, and will be
   // assigned to the visitor the session creates.
   // TODO(martinduke): Support delivering object payload.
@@ -171,6 +224,7 @@
     }
   }
 
+  webtransport::test::MockStream mock_stream_;
   MockSessionCallbacks session_callbacks_;
   webtransport::test::MockSession mock_session_;
   MoqtSession session_;
@@ -183,20 +237,19 @@
 
 // Verify the session sends CLIENT_SETUP on the control stream.
 TEST_F(MoqtSessionTest, OnSessionReady) {
-  webtransport::test::MockStream mock_stream;
   EXPECT_CALL(mock_session_, OpenOutgoingBidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> visitor;
   // Save a reference to MoqtSession::Stream
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
         visitor = std::move(new_visitor);
       });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillOnce(Return(webtransport::StreamId(4)));
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] { return visitor.get(); });
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kClientSetup), _));
   session_.OnSessionReady();
 
@@ -217,16 +270,15 @@
       &mock_session_, MoqtSessionParameters(quic::Perspective::IS_SERVER),
       std::make_unique<quic::test::TestAlarmFactory>(),
       session_callbacks_.AsSessionCallbacks());
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_);
   MoqtClientSetup setup = {
       /*supported_versions=*/{kDefaultMoqtVersion},
       /*path=*/std::nullopt,
   };
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _));
-  EXPECT_CALL(mock_stream, GetStreamId()).WillOnce(Return(0));
+  EXPECT_CALL(mock_stream_, GetStreamId()).WillOnce(Return(0));
   EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
   stream_input->OnClientSetupMessage(setup);
 }
@@ -244,12 +296,11 @@
 
 TEST_F(MoqtSessionTest, OnIncomingBidirectionalStream) {
   ::testing::InSequence seq;
-  webtransport::test::MockStream mock_stream;
   StrictMock<webtransport::test::MockStreamVisitor> mock_stream_visitor;
   EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
-      .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
-  EXPECT_CALL(mock_stream, visitor()).WillOnce(Return(&mock_stream_visitor));
+      .WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce(Return(&mock_stream_visitor));
   EXPECT_CALL(mock_stream_visitor, OnCanRead()).Times(1);
   EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
       .WillOnce(Return(nullptr));
@@ -258,12 +309,11 @@
 
 TEST_F(MoqtSessionTest, OnIncomingUnidirectionalStream) {
   ::testing::InSequence seq;
-  webtransport::test::MockStream mock_stream;
   StrictMock<webtransport::test::MockStreamVisitor> mock_stream_visitor;
   EXPECT_CALL(mock_session_, AcceptIncomingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
-  EXPECT_CALL(mock_stream, visitor()).WillOnce(Return(&mock_stream_visitor));
+      .WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce(Return(&mock_stream_visitor));
   EXPECT_CALL(mock_stream_visitor, OnCanRead()).Times(1);
   EXPECT_CALL(mock_session_, AcceptIncomingUnidirectionalStream())
       .WillOnce(Return(nullptr));
@@ -287,25 +337,19 @@
 
 TEST_F(MoqtSessionTest, AddLocalTrack) {
   MoqtSubscribe request = DefaultSubscribe();
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   // Request for track returns SUBSCRIBE_ERROR.
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _));
   stream_input->OnSubscribeMessage(request);
 
   // Add the track. Now Subscribe should succeed.
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  EXPECT_CALL(*track_publisher, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kStatusNotAvailable));
-  publisher_.Add(track_publisher);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  request.subscribe_id = 2;
-  stream_input->OnSubscribeMessage(request);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  std::make_shared<MockTrackPublisher>(request.full_track_name);
+  ++request.subscribe_id;
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
 TEST_F(MoqtSessionTest, AnnounceWithOkAndCancel) {
@@ -313,11 +357,10 @@
       FullTrackName track_namespace,
       std::optional<MoqtAnnounceErrorReason> error_message)>
       announce_resolved_callback;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
   session_.Announce(FullTrackName{"foo"},
                     announce_resolved_callback.AsStdFunction());
@@ -356,11 +399,10 @@
       FullTrackName track_namespace,
       std::optional<MoqtAnnounceErrorReason> error_message)>
       announce_resolved_callback;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
   session_.Announce(FullTrackName{"foo"},
                     announce_resolved_callback.AsStdFunction());
@@ -376,8 +418,8 @@
       });
   stream_input->OnAnnounceOkMessage(ok);
 
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kUnannounce), _));
   session_.Unannounce(FullTrackName{"foo"});
   // State is gone.
@@ -389,11 +431,10 @@
       FullTrackName track_namespace,
       std::optional<MoqtAnnounceErrorReason> error_message)>
       announce_resolved_callback;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _));
   session_.Announce(FullTrackName{"foo"},
                     announce_resolved_callback.AsStdFunction());
@@ -416,100 +457,82 @@
   EXPECT_FALSE(session_.Unannounce(FullTrackName{"foo"}));
 }
 
-TEST_F(MoqtSessionTest, SubscribeForPast) {
-  FullTrackName ftn("foo", "bar");
-  auto track = std::make_shared<MockTrackPublisher>(ftn);
-  EXPECT_CALL(*track, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(*track, GetDeliveryOrder()).WillRepeatedly([] {
-    return MoqtDeliveryOrder::kAscending;
-  });
-  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
-      .WillRepeatedly(Return(std::vector<FullSequence>()));
-  EXPECT_CALL(*track, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(10, 20)));
-  publisher_.Add(track);
-
-  webtransport::test::MockStream mock_stream;
+TEST_F(MoqtSessionTest, AsynchronousSubscribeReturnsOk) {
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  MoqtSubscribeOk expected_ok = {
-      /*subscribe_id=*/1,
-      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
-      /*group_order=*/MoqtDeliveryOrder::kAscending,
-      /*largest_id=*/FullSequence(10, 20),
-  };
-  EXPECT_CALL(mock_stream, Writev(SerializedControlMessage(expected_ok), _));
-  stream_input->OnSubscribeMessage(DefaultSubscribe());
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtSubscribe request = DefaultSubscribe();
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtObjectListener* listener;
+  EXPECT_CALL(*track, AddObjectListener)
+      .WillOnce(
+          [&](MoqtObjectListener* listener_ptr) { listener = listener_ptr; });
+  stream_input->OnSubscribeMessage(request);
+
+  EXPECT_CALL(mock_stream_,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
+  listener->OnSubscribeAccepted();
+  EXPECT_NE(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+}
+
+TEST_F(MoqtSessionTest, AsynchronousSubscribeReturnsError) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtSubscribe request = DefaultSubscribe();
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtObjectListener* listener;
+  EXPECT_CALL(*track, AddObjectListener)
+      .WillOnce(
+          [&](MoqtObjectListener* listener_ptr) { listener = listener_ptr; });
+  stream_input->OnSubscribeMessage(request);
+  EXPECT_CALL(
+      mock_stream_,
+      Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _));
+  listener->OnSubscribeRejected(
+      MoqtSubscribeErrorReason(SubscribeErrorCode::kInternalError,
+                               "Test error"),
+      request.track_alias);
+  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+}
+
+TEST_F(MoqtSessionTest, SubscribeForPast) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(10, 20));
+  MoqtSubscribe request = DefaultSubscribe();
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
 TEST_F(MoqtSessionTest, SubscribeEntirelyInPast) {
-  FullTrackName ftn("foo", "bar");
-  auto track = std::make_shared<MockTrackPublisher>(ftn);
-  EXPECT_CALL(*track, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(*track, GetDeliveryOrder()).WillRepeatedly([] {
-    return MoqtDeliveryOrder::kAscending;
-  });
-  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
-      .WillRepeatedly(Return(std::vector<FullSequence>()));
-  EXPECT_CALL(*track, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(10, 20)));
-  publisher_.Add(track);
-
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(10, 20));
+
+  MoqtSubscribe request = DefaultSubscribe();
+  request.end_group = 9;
+  EXPECT_CALL(*track, AddObjectListener)
+      .WillOnce([&](MoqtObjectListener* listener) {
+        listener->OnSubscribeAccepted();
+      });
   MoqtSubscribeError expected_error = {
-      /*subscribe_id=*/1,
+      /*subscribe_id=*/request.subscribe_id,
       /*error_code=*/SubscribeErrorCode::kInvalidRange,
-      /*reason_phrase=*/"SUBSCRIBE ends in previous group",
-      /*track_alias=*/2,
+      /*reason_phrase=*/"SUBSCRIBE ends in past group",
+      /*track_alias=*/request.track_alias,
   };
-  EXPECT_CALL(mock_stream, Writev(SerializedControlMessage(expected_error), _));
-  MoqtSubscribe subscribe = DefaultSubscribe();
-  subscribe.end_group = 9;
-  stream_input->OnSubscribeMessage(subscribe);
+  EXPECT_CALL(mock_stream_,
+              Writev(SerializedControlMessage(expected_error), _));
+  stream_input->OnSubscribeMessage(request);
+  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
 }
 
 TEST_F(MoqtSessionTest, TwoSubscribesForTrack) {
-  FullTrackName ftn("foo", "bar");
-  auto track = std::make_shared<MockTrackPublisher>(ftn);
-  EXPECT_CALL(*track, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
-      .WillRepeatedly(Return(std::vector<FullSequence>()));
-  EXPECT_CALL(*track, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(10, 20)));
-  publisher_.Add(track);
-
-  // Peer subscribes to (11, 0)
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*full_track_name=*/FullTrackName({"foo", "bar"}),
-      /*subscriber_priority=*/0x80,
-      /*group_order=*/std::nullopt,
-      /*start_group=*/11,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*parameters=*/MoqtSubscribeParameters(),
-  };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(request);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtSubscribe request = DefaultSubscribe();
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 
   request.subscribe_id = 2;
   request.start_group = 12;
@@ -518,41 +541,14 @@
                            "Duplicate subscribe for track"))
       .Times(1);
   stream_input->OnSubscribeMessage(request);
-  ;
 }
 
 TEST_F(MoqtSessionTest, UnsubscribeAllowsSecondSubscribe) {
-  FullTrackName ftn("foo", "bar");
-  auto track = std::make_shared<MockTrackPublisher>(ftn);
-  EXPECT_CALL(*track, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
-      .WillRepeatedly(Return(std::vector<FullSequence>()));
-  EXPECT_CALL(*track, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(10, 20)));
-  publisher_.Add(track);
-
-  // Peer subscribes to (11, 0)
-  MoqtSubscribe request = {
-      /*subscribe_id=*/1,
-      /*track_alias=*/2,
-      /*full_track_name=*/FullTrackName({"foo", "bar"}),
-      /*subscriber_priority=*/0x80,
-      /*group_order=*/std::nullopt,
-      /*start_group=*/11,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*parameters=*/MoqtSubscribeParameters(),
-  };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(request);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtSubscribe request = DefaultSubscribe();
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 
   // Peer unsubscribes.
   MoqtUnsubscribe unsubscribe = {
@@ -564,53 +560,38 @@
   // Subscribe again, succeeds.
   request.subscribe_id = 2;
   request.start_group = 12;
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(request);
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
 TEST_F(MoqtSessionTest, SubscribeIdTooHigh) {
   // Peer subscribes to (0, 0)
-  MoqtSubscribe request = {
-      /*subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
-      /*track_alias=*/2,
-      /*full_track_name=*/FullTrackName({"foo", "bar"}),
-      /*subscriber_priority=*/0x80,
-      /*group_order=*/std::nullopt,
-      /*start_group=*/0,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*parameters=*/MoqtSubscribeParameters(),
-  };
-  webtransport::test::MockStream mock_stream;
+  MoqtSubscribe request = DefaultSubscribe();
+  request.subscribe_id = kDefaultInitialMaxSubscribeId + 1;
+
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kTooManySubscribes),
-                           "Received SUBSCRIBE with too large ID"))
-      .Times(1);
+                           "Received SUBSCRIBE with too large ID"));
   stream_input->OnSubscribeMessage(request);
 }
 
 TEST_F(MoqtSessionTest, SubscribeIdNotIncreasing) {
   MoqtSubscribe request = DefaultSubscribe();
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   // Request for track returns SUBSCRIBE_ERROR.
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _));
   stream_input->OnSubscribeMessage(request);
 
   // Second request is a protocol violation.
-  request.subscribe_id = 0;
-  request.track_alias = 3;
+  ++request.track_alias;
   request.full_track_name = FullTrackName({"dead", "beef"});
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
-                           "Subscribe ID not monotonically increasing"))
-      .Times(1);
+                           "Subscribe ID not monotonically increasing"));
   stream_input->OnSubscribeMessage(request);
 }
 
@@ -618,17 +599,16 @@
   MoqtSessionPeer::set_next_subscribe_id(&session_,
                                          kDefaultInitialMaxSubscribeId - 1);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_, GetStreamById(_))
-      .WillRepeatedly(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                              &remote_track_visitor));
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
       .Times(1);
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
@@ -640,12 +620,11 @@
 
 TEST_F(MoqtSessionTest, SubscribeDuplicateTrackName) {
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_, GetStreamById(_))
-      .WillRepeatedly(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                              &remote_track_visitor));
@@ -654,12 +633,11 @@
 }
 
 TEST_F(MoqtSessionTest, SubscribeWithOk) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                  &remote_track_visitor);
@@ -682,13 +660,12 @@
   MoqtSessionPeer::set_next_subscribe_id(&session_,
                                          kDefaultInitialMaxSubscribeId);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_, GetStreamById(_))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                               &remote_track_visitor));
@@ -697,7 +674,7 @@
   };
   stream_input->OnMaxSubscribeIdMessage(max_subscribe_id);
 
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                              &remote_track_visitor));
@@ -707,9 +684,8 @@
   MoqtMaxSubscribeId max_subscribe_id = {
       /*max_subscribe_id=*/kDefaultInitialMaxSubscribeId - 1,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(
       mock_session_,
       CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -719,50 +695,25 @@
 }
 
 TEST_F(MoqtSessionTest, GrantMoreSubscribes) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kMaxSubscribeId), _));
   session_.GrantMoreSubscribes(1);
   // Peer subscribes to (0, 0)
-  MoqtSubscribe request = {
-      /*subscribe_id=*/kDefaultInitialMaxSubscribeId,
-      /*track_alias=*/2,
-      /*full_track_name=*/FullTrackName({"foo", "bar"}),
-      /*subscriber_priority=*/0x80,
-      /*group_order=*/std::nullopt,
-      /*start_group=*/10,
-      /*start_object=*/0,
-      /*end_group=*/std::nullopt,
-      /*parameters=*/MoqtSubscribeParameters(),
-  };
-  FullTrackName ftn("foo", "bar");
-  auto track = std::make_shared<MockTrackPublisher>(ftn);
-  EXPECT_CALL(*track, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track, GetCachedObject(_)).WillRepeatedly([] {
-    return std::optional<PublishedObject>();
-  });
-  EXPECT_CALL(*track, GetCachedObjectsInRange(_, _))
-      .WillRepeatedly(Return(std::vector<FullSequence>()));
-  EXPECT_CALL(*track, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(10, 20)));
-  publisher_.Add(track);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(request);
+  MoqtSubscribe request = DefaultSubscribe();
+  request.subscribe_id = kDefaultInitialMaxSubscribeId;
+  MockTrackPublisher* track = CreateTrackPublisher();
+  ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
 TEST_F(MoqtSessionTest, SubscribeWithError) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                  &remote_track_visitor);
@@ -784,13 +735,12 @@
 }
 
 TEST_F(MoqtSessionTest, Unsubscribe) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
   MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
                                      &remote_track_visitor);
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kUnsubscribe), _));
   EXPECT_NE(MoqtSessionPeer::remote_track(&session_, 2), nullptr);
   session_.Unsubscribe(FullTrackName("foo", "bar"));
@@ -800,9 +750,8 @@
 
 TEST_F(MoqtSessionTest, ReplyToAnnounceWithOkThenUnannounce) {
   FullTrackName track_namespace{"foo"};
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtAnnounce announce = {
       track_namespace,
   };
@@ -810,7 +759,7 @@
               Call(track_namespace, AnnounceEvent::kAnnounce))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(SerializedControlMessage(MoqtAnnounceOk{track_namespace}), _));
   stream_input->OnAnnounceMessage(announce);
   MoqtUnannounce unannounce = {
@@ -824,9 +773,9 @@
 
 TEST_F(MoqtSessionTest, ReplyToAnnounceWithOkThenAnnounceCancel) {
   FullTrackName track_namespace{"foo"};
-  webtransport::test::MockStream mock_stream;
+
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtAnnounce announce = {
       track_namespace,
   };
@@ -834,10 +783,10 @@
               Call(track_namespace, AnnounceEvent::kAnnounce))
       .WillOnce(Return(std::nullopt));
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(SerializedControlMessage(MoqtAnnounceOk{track_namespace}), _));
   stream_input->OnAnnounceMessage(announce);
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(SerializedControlMessage(MoqtAnnounceCancel{
                          track_namespace, SubscribeErrorCode::kInternalError,
                          "deadbeef"}),
@@ -848,9 +797,9 @@
 
 TEST_F(MoqtSessionTest, ReplyToAnnounceWithError) {
   FullTrackName track_namespace{"foo"};
-  webtransport::test::MockStream mock_stream;
+
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtAnnounce announce = {
       track_namespace,
   };
@@ -862,7 +811,7 @@
               Call(track_namespace, AnnounceEvent::kAnnounce))
       .WillOnce(Return(error));
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(SerializedControlMessage(MoqtAnnounceError{
                  track_namespace, error.error_code, error.reason_phrase}),
              _));
@@ -870,14 +819,13 @@
 }
 
 TEST_F(MoqtSessionTest, SubscribeAnnouncesLifeCycle) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   FullTrackName track_namespace("foo", "bar");
   track_namespace.NameToNamespace();
   bool got_callback = false;
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeAnnounces), _));
   session_.SubscribeAnnounces(
       track_namespace,
@@ -894,21 +842,20 @@
   stream_input->OnSubscribeAnnouncesOkMessage(ok);
   EXPECT_TRUE(got_callback);
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kUnsubscribeAnnounces), _));
   EXPECT_TRUE(session_.UnsubscribeAnnounces(track_namespace));
   EXPECT_FALSE(session_.UnsubscribeAnnounces(track_namespace));
 }
 
 TEST_F(MoqtSessionTest, SubscribeAnnouncesError) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   FullTrackName track_namespace("foo", "bar");
   track_namespace.NameToNamespace();
   bool got_callback = false;
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeAnnounces), _));
   session_.SubscribeAnnounces(
       track_namespace,
@@ -946,13 +893,12 @@
       /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, true);
 }
@@ -972,13 +918,12 @@
       /*subgroup_id=*/0,
       /*payload_length=*/16,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, false);
   object_stream->OnObjectMessage(object, payload, true);  // complete the object
@@ -1004,13 +949,12 @@
       /*subgroup_id=*/0,
       /*payload_length=*/16,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(2);
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, false);
   object_stream->OnObjectMessage(object, payload, true);  // complete the object
@@ -1031,10 +975,9 @@
       /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
@@ -1044,7 +987,7 @@
         EXPECT_EQ(sequence.group, object.group_id);
         EXPECT_EQ(sequence.object, object.object_id);
       });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, true);
 
@@ -1077,10 +1020,9 @@
       /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _))
       .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
@@ -1090,7 +1032,7 @@
         EXPECT_EQ(sequence.group, object.group_id);
         EXPECT_EQ(sequence.object, object.object_id);
       });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, true);
 
@@ -1163,27 +1105,26 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
   // Verify first six message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1218,27 +1159,26 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
   // Verify first five message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1271,27 +1211,26 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
   // Verify first six message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1313,7 +1252,7 @@
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(fin);
 
-  EXPECT_CALL(mock_stream, ResetWithUserCode(kResetCodeTimedOut));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeTimedOut));
   subscription->OnGroupAbandoned(5);
 }
 
@@ -1327,27 +1266,26 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
   // Verify first six message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1369,7 +1307,7 @@
   EXPECT_TRUE(correct_message);
   EXPECT_FALSE(fin);
   fin = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         EXPECT_TRUE(data.empty());
@@ -1389,27 +1327,26 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
   // Verify first six message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1430,19 +1367,19 @@
   subscription->OnNewObjectAvailable(FullSequence(5, 0));
   EXPECT_FALSE(fin);
   // Try to deliver (5,1), but fail.
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return false; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
   EXPECT_CALL(*track, GetCachedObject(_)).Times(0);
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
+  EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
   subscription->OnNewObjectAvailable(FullSequence(5, 1));
   // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
+  EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
   subscription->OnNewFinAvailable(FullSequence(5, 1));
 
   // Reopen the window.
   correct_message = false;
   // object id, extensions, payload length, status.
   const std::string kExpectedMessage2 = {0x01, 0x00, 0x00, 0x03};
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return true; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
   EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([&] {
     return PublishedObject{
         FullSequence(5, 1),     MoqtObjectStatus::kEndOfGroup,   127,
@@ -1451,7 +1388,7 @@
   EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 2))).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage2);
@@ -1474,26 +1411,25 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
   // Verify first six message fields are sent correctly
   bool correct_message = false;
   const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x7f, 0x00, 0x00};
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = absl::StartsWith(data[0], kExpectedMessage);
@@ -1514,7 +1450,7 @@
   subscription->OnNewObjectAvailable(FullSequence(5, 0));
 
   // Abandon the subgroup.
-  EXPECT_CALL(mock_stream, ResetWithUserCode(0x1)).Times(1);
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
   subscription->OnSubgroupAbandoned(FullSequence(5, 0), 0x1);
 }
 
@@ -1536,23 +1472,22 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
     return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128,
                            MemSliceFromString("deadbeef")};
@@ -1583,23 +1518,22 @@
       .WillOnce(Return(true))
       .WillOnce(Return(true));
   bool fin = false;
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly([&] { return !fin; });
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return !fin; });
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   EXPECT_CALL(*track, GetCachedObject(FullSequence(6, 0))).WillRepeatedly([] {
     return PublishedObject{FullSequence(6, 0), MoqtObjectStatus::kNormal, 128,
                            MemSliceFromString("deadbeef")};
@@ -1620,24 +1554,23 @@
   // Set up an outgoing stream for a group.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
-  webtransport::test::MockStream mock_stream;
-  EXPECT_CALL(mock_stream, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> stream_visitor;
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
         stream_visitor = std::move(visitor);
       });
-  EXPECT_CALL(mock_stream, visitor()).WillRepeatedly([&] {
+  EXPECT_CALL(mock_stream_, visitor()).WillRepeatedly([&] {
     return stream_visitor.get();
   });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
 
-  EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
     return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128,
                            MemSliceFromString("deadbeef")};
@@ -1656,27 +1589,26 @@
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
-  webtransport::test::MockStream mock_stream;
   EXPECT_CALL(mock_session_, OpenOutgoingBidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   std::unique_ptr<webtransport::StreamVisitor> visitor;
   // Save a reference to MoqtSession::Stream
-  EXPECT_CALL(mock_stream, SetVisitor(_))
+  EXPECT_CALL(mock_stream_, SetVisitor(_))
       .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> new_visitor) {
         visitor = std::move(new_visitor);
       });
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillOnce(Return(webtransport::StreamId(4)));
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
-  EXPECT_CALL(mock_stream, visitor()).WillOnce([&] { return visitor.get(); });
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return visitor.get(); });
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kClientSetup), _));
   session_.OnSessionReady();
 
   // Peer tries to open a bidi stream.
   bool reported_error = false;
   EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
                            "Bidirectional stream already open"))
@@ -1694,23 +1626,22 @@
       &mock_session_, MoqtSessionParameters(quic::Perspective::IS_SERVER),
       std::make_unique<quic::test::TestAlarmFactory>(),
       session_callbacks_.AsSessionCallbacks());
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_);
   MoqtClientSetup setup = {
       /*supported_versions*/ {kDefaultMoqtVersion},
       /*path=*/std::nullopt,
   };
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _));
-  EXPECT_CALL(mock_stream, GetStreamId()).WillOnce(Return(0));
+  EXPECT_CALL(mock_stream_, GetStreamId()).WillOnce(Return(0));
   EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
   stream_input->OnClientSetupMessage(setup);
 
   // Peer tries to open a bidi stream.
   bool reported_error = false;
   EXPECT_CALL(mock_session_, AcceptIncomingBidirectionalStream())
-      .WillOnce(Return(&mock_stream));
+      .WillOnce(Return(&mock_stream_));
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
                            "Bidirectional stream already open"))
@@ -1728,9 +1659,8 @@
   auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
                               FullSequence(4, 2));
   MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtUnsubscribe unsubscribe = {
       /*subscribe_id=*/0,
   };
@@ -1809,13 +1739,12 @@
       /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
 
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
-  EXPECT_CALL(mock_stream, GetStreamId())
+  EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, true);
   char datagram[] = {0x01, 0x02, 0x00, 0x10, 0x00, 0x00, 0x08, 0x64,
@@ -1843,10 +1772,9 @@
       /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtDataParserVisitor> object_stream =
       MoqtSessionPeer::CreateIncomingDataStream(
-          &session_, &mock_stream, MoqtDataStreamType::kStreamHeaderSubgroup);
+          &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
   EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(0);
   object_stream->OnObjectMessage(object, payload, true);
 }
@@ -2085,22 +2013,16 @@
 }
 
 TEST_F(MoqtSessionTest, FetchReturnsOk) {
-  webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtFetch fetch = DefaultFetch();
-  auto track = std::make_shared<MockTrackPublisher>(fetch.full_track_name);
-  publisher_.Add(track);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(0, 0));
 
   auto fetch_task_ptr = std::make_unique<MockFetchTask>();
-  MockFetchTask* fetch_task = fetch_task_ptr.get();
-  EXPECT_CALL(*track, Fetch(_, _, _, _))
-      .WillOnce(Return(std::move(fetch_task_ptr)));
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
   // Compose and send the FETCH_OK.
-  EXPECT_CALL(*track, GetDeliveryOrder())
-      .WillRepeatedly(Return(MoqtDeliveryOrder::kAscending));
-  EXPECT_CALL(*fetch_task, GetLargestId()).WillOnce(Return(FullSequence(0, 0)));
-  EXPECT_CALL(control_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
   // Stream can't open yet.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream)
@@ -2113,17 +2035,14 @@
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
   MoqtFetch fetch = DefaultFetch();
-  auto track = std::make_shared<MockTrackPublisher>(fetch.full_track_name);
-  publisher_.Add(track);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(0, 0));
 
   auto fetch_task_ptr = std::make_unique<MockFetchTask>();
   MockFetchTask* fetch_task = fetch_task_ptr.get();
   EXPECT_CALL(*track, Fetch(_, _, _, _))
       .WillOnce(Return(std::move(fetch_task_ptr)));
   // Compose and send the FETCH_OK.
-  EXPECT_CALL(*track, GetDeliveryOrder())
-      .WillRepeatedly(Return(MoqtDeliveryOrder::kAscending));
-  EXPECT_CALL(*fetch_task, GetLargestId()).WillOnce(Return(FullSequence(0, 0)));
   EXPECT_CALL(control_stream,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
   // Open stream immediately.
@@ -2190,20 +2109,17 @@
 }
 
 TEST_F(MoqtSessionTest, FetchFails) {
-  webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtFetch fetch = DefaultFetch();
-  auto track = std::make_shared<MockTrackPublisher>(fetch.full_track_name);
-  publisher_.Add(track);
+  MockTrackPublisher* track = CreateTrackPublisher();
 
   auto fetch_task_ptr = std::make_unique<MockFetchTask>();
   MockFetchTask* fetch_task = fetch_task_ptr.get();
-  EXPECT_CALL(*track, Fetch(_, _, _, _))
-      .WillOnce(Return(std::move(fetch_task_ptr)));
+  EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
   EXPECT_CALL(*fetch_task, GetStatus())
       .WillRepeatedly(Return(absl::Status(absl::StatusCode::kInternal, "foo")));
-  EXPECT_CALL(control_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
   stream_input->OnFetchMessage(fetch);
 }
@@ -2340,19 +2256,12 @@
   subscribe.start_group = std::nullopt;
   subscribe.start_object = std::nullopt;
   subscribe.end_group = std::nullopt;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(subscribe.full_track_name);
-  EXPECT_CALL(*track_publisher, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track_publisher, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(4, 0, 10)));
-  publisher_.Add(track_publisher);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(subscribe);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(4, 0, 10));
+  ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
+
   MoqtObjectListener* subscription =
       MoqtSessionPeer::GetSubscription(&session_, subscribe.subscribe_id);
   ASSERT_NE(subscription, nullptr);
@@ -2364,18 +2273,17 @@
   // Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
   MoqtFetch fetch = DefaultFetch();
   fetch.joining_fetch = {1, 2};
-  EXPECT_CALL(*track_publisher,
+  EXPECT_CALL(*track,
               Fetch(FullSequence(2, 0), 4, std::optional<uint64_t>(10), _))
       .WillOnce(Return(std::make_unique<MockFetchTask>()));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
   stream_input->OnFetchMessage(fetch);
 }
 
 TEST_F(MoqtSessionTest, IncomingJoiningFetchBadSubscribeId) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtFetch fetch = DefaultFetch();
   fetch.joining_fetch = {1, 2};
   MoqtFetchError expected_error = {
@@ -2383,27 +2291,18 @@
       /*error_code=*/SubscribeErrorCode::kDoesNotExist,
       /*reason_phrase=*/"Joining Fetch for non-existent subscribe",
   };
-  EXPECT_CALL(mock_stream, Writev(SerializedControlMessage(expected_error), _));
+  EXPECT_CALL(mock_stream_,
+              Writev(SerializedControlMessage(expected_error), _));
   stream_input->OnFetchMessage(fetch);
 }
 
 TEST_F(MoqtSessionTest, IncomingJoiningFetchNonLatestObject) {
   MoqtSubscribe subscribe = DefaultSubscribe();
-  // DefaultSubscribe is AbsoluteStart. Because it uses Object ID 0, it will be
-  // misindentified as LatestGroup, but it will still trigger a Protocol Error.
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(subscribe.full_track_name);
-  EXPECT_CALL(*track_publisher, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  EXPECT_CALL(*track_publisher, GetLargestSequence())
-      .WillRepeatedly(Return(FullSequence(0, 0, 10)));
-  publisher_.Add(track_publisher);
-  EXPECT_CALL(mock_stream,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  stream_input->OnSubscribeMessage(subscribe);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  SetLargestId(track, FullSequence(2, 0, 10));
+  ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
   MoqtFetch fetch = DefaultFetch();
   fetch.joining_fetch = {1, 2};
@@ -2416,11 +2315,10 @@
 
 TEST_F(MoqtSessionTest, SendJoiningFetch) {
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_, GetStreamById(_))
-      .WillRepeatedly(Return(&mock_stream));
+      .WillRepeatedly(Return(&mock_stream_));
   MoqtSubscribe expected_subscribe = {
       /*subscribe_id=*/0,
       /*track_alias=*/0,
@@ -2437,9 +2335,10 @@
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*joining_fetch=*/JoiningFetch(0, 1),
   };
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(SerializedControlMessage(expected_subscribe), _));
-  EXPECT_CALL(mock_stream, Writev(SerializedControlMessage(expected_fetch), _));
+  EXPECT_CALL(mock_stream_,
+              Writev(SerializedControlMessage(expected_fetch), _));
   EXPECT_TRUE(session_.JoiningFetch(
       expected_subscribe.full_track_name, &remote_track_visitor, nullptr, 1,
       0x80, MoqtDeliveryOrder::kAscending, MoqtSubscribeParameters()));
@@ -2447,14 +2346,13 @@
 
 TEST_F(MoqtSessionTest, SendJoiningFetchNoFlowControl) {
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(mock_session_, GetStreamById(_))
-      .WillRepeatedly(Return(&mock_stream));
-  EXPECT_CALL(mock_stream,
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetch), _));
   EXPECT_TRUE(session_.JoiningFetch(FullTrackName("foo", "bar"),
                                     &remote_track_visitor, 0));
@@ -2537,9 +2435,8 @@
 }
 
 TEST_F(MoqtSessionTest, FetchThenOkThenCancel) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   std::unique_ptr<MoqtFetchTask> fetch_task;
   session_.Fetch(
       FullTrackName("foo", "bar"),
@@ -2562,15 +2459,14 @@
   EXPECT_EQ(fetch_task->GetNextObject(object),
             MoqtFetchTask::GetNextObjectResult::kPending);
   // Cancel the fetch.
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchCancel), _));
   fetch_task.reset();
 }
 
 TEST_F(MoqtSessionTest, FetchThenError) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   std::unique_ptr<MoqtFetchTask> fetch_task;
   session_.Fetch(
       FullTrackName("foo", "bar"),
@@ -2592,9 +2488,8 @@
 
 // The application takes objects as they arrive.
 TEST_F(MoqtSessionTest, IncomingFetchObjectsGreedyApp) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   std::unique_ptr<MoqtFetchTask> fetch_task;
   uint64_t expected_object_id = 0;
   session_.Fetch(
@@ -2671,9 +2566,8 @@
 }
 
 TEST_F(MoqtSessionTest, IncomingFetchObjectsSlowApp) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   std::unique_ptr<MoqtFetchTask> fetch_task;
   uint64_t expected_object_id = 0;
   bool objects_available = false;
@@ -2809,17 +2703,11 @@
 TEST_F(MoqtSessionTest, DeliveryTimeoutParameter) {
   MoqtSubscribe request = DefaultSubscribe();
   request.parameters.delivery_timeout = quic::QuicTimeDelta::FromSeconds(1);
-  webtransport::test::MockStream control_mock;
   std::unique_ptr<MoqtControlParserVisitor> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &control_mock);
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  EXPECT_CALL(*track_publisher, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  publisher_.Add(track_publisher);
-  EXPECT_CALL(control_mock,
-              Writev(ControlMessageOfType(MoqtMessageType::kSubscribeOk), _));
-  control_stream->OnSubscribeMessage(request);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  ReceiveSubscribeSynchronousOk(track, request, control_stream.get());
+
   MoqtObjectListener* subscription =
       MoqtSessionPeer::GetSubscription(&session_, 1);
   ASSERT_NE(subscription, nullptr);
@@ -3058,13 +2946,12 @@
 }
 
 TEST_F(MoqtSessionTest, ReceiveGoAwayEnforcement) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   EXPECT_CALL(session_callbacks_.goaway_received_callback, Call("foo"));
   stream_input->OnGoAwayMessage(MoqtGoAway("foo"));
   // New requests not allowed.
-  EXPECT_CALL(mock_stream, Writev).Times(0);
+  EXPECT_CALL(mock_stream_, Writev).Times(0);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                               &remote_track_visitor));
@@ -3095,37 +2982,32 @@
 }
 
 TEST_F(MoqtSessionTest, SendGoAwayEnforcement) {
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  auto track_publisher =
-      std::make_shared<MockTrackPublisher>(FullTrackName("foo", "bar"));
-  EXPECT_CALL(*track_publisher, GetTrackStatus())
-      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
-  publisher_.Add(track_publisher);
-  EXPECT_CALL(mock_stream,
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  CreateTrackPublisher();
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kGoAway), _));
   session_.GoAway("");
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _));
   stream_input->OnSubscribeMessage(DefaultSubscribe());
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kAnnounceError), _));
   stream_input->OnAnnounceMessage(
       MoqtAnnounce(FullTrackName("foo", "bar"), MoqtSubscribeParameters()));
-  EXPECT_CALL(mock_stream,
+  EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
   MoqtFetch fetch = DefaultFetch();
   stream_input->OnFetchMessage(fetch);
   EXPECT_CALL(
-      mock_stream,
+      mock_stream_,
       Writev(ControlMessageOfType(MoqtMessageType::kSubscribeAnnouncesError),
              _));
   stream_input->OnSubscribeAnnouncesMessage(
       MoqtSubscribeAnnounces(FullTrackName("foo", "bar")));
   // Block all outgoing SUBSCRIBE, ANNOUNCE, GOAWAY,etc.
-  EXPECT_CALL(mock_stream, Writev).Times(0);
+  EXPECT_CALL(mock_stream_, Writev).Times(0);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                               &remote_track_visitor));
@@ -3154,24 +3036,22 @@
 
 TEST_F(MoqtSessionTest, ClientCannotSendNewSessionUri) {
   // session_ is a client session.
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   // Client GOAWAY not sent.
-  EXPECT_CALL(mock_stream, Writev).Times(0);
+  EXPECT_CALL(mock_stream_, Writev).Times(0);
   session_.GoAway("foo");
 }
 
 TEST_F(MoqtSessionTest, ServerCannotReceiveNewSessionUri) {
   webtransport::test::MockSession mock_session;
-  webtransport::test::MockStream mock_stream;
   MoqtSession session(&mock_session,
                       MoqtSessionParameters(quic::Perspective::IS_SERVER),
                       std::make_unique<quic::test::TestAlarmFactory>(),
                       session_callbacks_.AsSessionCallbacks());
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session, &mock_stream);
-  MoqtSessionPeer::CreateControlStream(&session, &mock_stream);
+      MoqtSessionPeer::CreateControlStream(&session, &mock_stream_);
+  MoqtSessionPeer::CreateControlStream(&session, &mock_stream_);
   EXPECT_CALL(
       mock_session,
       CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -3379,12 +3259,11 @@
       /*start_object=*/0,
       /*end_group=*/7,
   };
-  webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream));
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_));
   bool correct_message = false;
-  EXPECT_CALL(mock_stream, Writev(_, _))
+  EXPECT_CALL(mock_stream_, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;