Refactor: Move OutgoingDataStream to a separate file and make the interfaces with PublishedSubscription explicit.

This shortens moqt_session.h and is part of separating out subscription state into something common to SUBSCRIBE and PUBLISH.

Is mostly a no-op for tests.

PiperOrigin-RevId: 915657635
diff --git a/build/source_list.bzl b/build/source_list.bzl
index b7ca694..8a79fac 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1608,6 +1608,7 @@
     "quic/moqt/moqt_trace_recorder.h",
     "quic/moqt/moqt_track.h",
     "quic/moqt/moqt_types.h",
+    "quic/moqt/moqt_uni_stream.h",
     "quic/moqt/relay_namespace_tree.h",
     "quic/moqt/session_namespace_tree.h",
     "quic/moqt/tools/chat_client.h",
@@ -1639,6 +1640,7 @@
     "quic/moqt/moqt_stream_map.cc",
     "quic/moqt/moqt_trace_recorder.cc",
     "quic/moqt/moqt_track.cc",
+    "quic/moqt/moqt_uni_stream.cc",
     "quic/moqt/relay_namespace_tree.cc",
     "quic/moqt/tools/chat_client.cc",
     "quic/moqt/tools/moq_chat.cc",
@@ -1669,6 +1671,7 @@
     "quic/moqt/moqt_session_test.cc",
     "quic/moqt/moqt_stream_map_test.cc",
     "quic/moqt/moqt_track_test.cc",
+    "quic/moqt/moqt_uni_stream_test.cc",
     "quic/moqt/relay_namespace_tree_test.cc",
     "quic/moqt/session_namespace_tree_test.cc",
     "quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 2de8cbf..7529969 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1612,6 +1612,7 @@
     "src/quiche/quic/moqt/moqt_trace_recorder.h",
     "src/quiche/quic/moqt/moqt_track.h",
     "src/quiche/quic/moqt/moqt_types.h",
+    "src/quiche/quic/moqt/moqt_uni_stream.h",
     "src/quiche/quic/moqt/relay_namespace_tree.h",
     "src/quiche/quic/moqt/session_namespace_tree.h",
     "src/quiche/quic/moqt/tools/chat_client.h",
@@ -1643,6 +1644,7 @@
     "src/quiche/quic/moqt/moqt_stream_map.cc",
     "src/quiche/quic/moqt/moqt_trace_recorder.cc",
     "src/quiche/quic/moqt/moqt_track.cc",
+    "src/quiche/quic/moqt/moqt_uni_stream.cc",
     "src/quiche/quic/moqt/relay_namespace_tree.cc",
     "src/quiche/quic/moqt/tools/chat_client.cc",
     "src/quiche/quic/moqt/tools/moq_chat.cc",
@@ -1674,6 +1676,7 @@
     "src/quiche/quic/moqt/moqt_session_test.cc",
     "src/quiche/quic/moqt/moqt_stream_map_test.cc",
     "src/quiche/quic/moqt/moqt_track_test.cc",
+    "src/quiche/quic/moqt/moqt_uni_stream_test.cc",
     "src/quiche/quic/moqt/relay_namespace_tree_test.cc",
     "src/quiche/quic/moqt/session_namespace_tree_test.cc",
     "src/quiche/quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/build/source_list.json b/build/source_list.json
index e159716..4a17ef1 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1611,6 +1611,7 @@
     "quiche/quic/moqt/moqt_trace_recorder.h",
     "quiche/quic/moqt/moqt_track.h",
     "quiche/quic/moqt/moqt_types.h",
+    "quiche/quic/moqt/moqt_uni_stream.h",
     "quiche/quic/moqt/relay_namespace_tree.h",
     "quiche/quic/moqt/session_namespace_tree.h",
     "quiche/quic/moqt/tools/chat_client.h",
@@ -1642,6 +1643,7 @@
     "quiche/quic/moqt/moqt_stream_map.cc",
     "quiche/quic/moqt/moqt_trace_recorder.cc",
     "quiche/quic/moqt/moqt_track.cc",
+    "quiche/quic/moqt/moqt_uni_stream.cc",
     "quiche/quic/moqt/relay_namespace_tree.cc",
     "quiche/quic/moqt/tools/chat_client.cc",
     "quiche/quic/moqt/tools/moq_chat.cc",
@@ -1673,6 +1675,7 @@
     "quiche/quic/moqt/moqt_session_test.cc",
     "quiche/quic/moqt/moqt_stream_map_test.cc",
     "quiche/quic/moqt/moqt_track_test.cc",
+    "quiche/quic/moqt/moqt_uni_stream_test.cc",
     "quiche/quic/moqt/relay_namespace_tree_test.cc",
     "quiche/quic/moqt/session_namespace_tree_test.cc",
     "quiche/quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index a763934..04561d5 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -12,6 +12,11 @@
 
 namespace moqt {
 
+// WebTransport lets applications split a session into multiple send groups
+// that have equal weight for scheduling. We don't have a use for that, so the
+// send group is always the same.
+constexpr webtransport::SendGroupId kMoqtSendGroupId = 0;
+
 // Priority that can be assigned to a track or individual streams associated
 // with the track by either the publisher or the subscriber.
 using MoqtPriority = uint8_t;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 35d6555..87e6796 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -30,7 +30,6 @@
 #include "quiche/quic/core/quic_alarm_factory.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
-#include "quiche/quic/core/quic_utils.h"
 #include "quiche/quic/moqt/moqt_bidi_stream.h"
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_fetch_task.h"
@@ -48,6 +47,7 @@
 #include "quiche/quic/moqt/moqt_stream_map.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/moqt_uni_stream.h"
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
@@ -68,11 +68,6 @@
 
 using ::quic::Perspective;
 
-// WebTransport lets applications split a session into multiple send groups
-// that have equal weight for scheduling. We don't have a use for that, so the
-// send group is always the same.
-constexpr webtransport::SendGroupId kMoqtSendGroupId = 0;
-
 class DefaultPublisher : public MoqtPublisher {
  public:
   static DefaultPublisher* GetInstance() {
@@ -779,7 +774,7 @@
   }
   PublishedSubscription& subscription = *it->second;
   if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
-    subscription.AddQueuedOutgoingDataStream(parameters);
+    subscription.AddQueuedOutgoingSubgroupStream(parameters);
     // The subscription will notify the session about how to update the
     // session's queue.
     // TODO: limit the number of streams in the queue.
@@ -798,8 +793,17 @@
         << "OpenDataStream called when creation of new streams is blocked.";
     return nullptr;
   }
-  new_stream->SetVisitor(std::make_unique<OutgoingDataStream>(
-      this, new_stream, subscription, parameters));
+  webtransport::StreamPriority priority{
+      kMoqtSendGroupId,
+      subscription.GetSendOrder(
+          Location(parameters.index.group, parameters.first_object),
+          parameters.index.subgroup,
+          parameters.publisher_priority.value_or(
+              subscription.default_publisher_priority()))};
+  new_stream->SetVisitor(std::make_unique<OutgoingSubgroupStream>(
+      framer_, new_stream, parameters.index, parameters.first_object,
+      subscription.GetWeakPtr(), subscription.publisher_shared_ptr(), priority,
+      subscription.track_alias(), &trace_recorder_));
   subscription.OnDataStreamCreated(new_stream->GetStreamId(), parameters.index);
   return new_stream;
 }
@@ -871,7 +875,7 @@
     // Pop the item from the subscription's queue, which might update
     // subscribes_with_queued_outgoing_data_streams_.
     NewStreamParameters next_queued_stream =
-        subscription->second->NextQueuedOutgoingDataStream();
+        subscription->second->NextQueuedOutgoingSubgroupStream();
     // Check if Group is too old.
     if (next_queued_stream.index.group <
         subscription->second->first_active_group()) {
@@ -1933,7 +1937,8 @@
       can_have_joining_fetch_(subscribe.parameters.forward()),
       track_alias_(track_alias),
       parameters_(subscribe.parameters),
-      monitoring_interface_(monitoring_interface) {
+      monitoring_interface_(monitoring_interface),
+      weak_ptr_factory_(this) {
   if (monitoring_interface_ != nullptr) {
     monitoring_interface_->OnObjectAckSupportKnown(
         subscribe.parameters.oack_window_size);
@@ -2084,8 +2089,8 @@
         if (raw_stream == nullptr) {
           continue;
         }
-        OutgoingDataStream* stream =
-            absl::down_cast<OutgoingDataStream*>(raw_stream->visitor());
+        OutgoingSubgroupStream* stream =
+            absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor());
         stream->CreateAndSetAlarm(session_->callbacks_.clock->ApproximateNow() +
                                   delivery_timeout());
       }
@@ -2111,10 +2116,9 @@
   if (raw_stream == nullptr) {
     return;
   }
-
-  OutgoingDataStream* stream =
-      absl::down_cast<OutgoingDataStream*>(raw_stream->visitor());
-  stream->SendObjects(*this);
+  OutgoingSubgroupStream* stream =
+      absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor());
+  stream->SendObjects();
 }
 
 void MoqtSession::PublishedSubscription::OnTrackPublisherGone() {
@@ -2144,8 +2148,8 @@
   if (raw_stream == nullptr) {
     return;
   }
-  OutgoingDataStream* stream =
-      absl::down_cast<OutgoingDataStream*>(raw_stream->visitor());
+  OutgoingSubgroupStream* stream =
+      absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor());
   stream->Fin(location);
 }
 
@@ -2201,8 +2205,8 @@
       continue;
     }
     raw_stream->ResetWithUserCode(kResetCodeDeliveryTimeout);
-    // Sending the Reset will call the destructor for OutgoingDataStream, which
-    // will erase it from the SendStreamMap.
+    // Sending the Reset will call the destructor for OutgoingSubgroupStream,
+    // which will erase it from the SendStreamMap.
   }
   first_active_group_ = std::max(first_active_group_, group_id + 1);
   absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) {
@@ -2238,7 +2242,7 @@
 }
 
 // Returns the highest send order in the subscription.
-void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream(
+void MoqtSession::PublishedSubscription::AddQueuedOutgoingSubgroupStream(
     const NewStreamParameters& parameters) {
   std::optional<webtransport::SendOrder> start_send_order =
       queued_outgoing_data_streams_.empty()
@@ -2261,11 +2265,11 @@
 }
 
 MoqtSession::NewStreamParameters
-MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
+MoqtSession::PublishedSubscription::NextQueuedOutgoingSubgroupStream() {
   QUICHE_DCHECK(!queued_outgoing_data_streams_.empty());
   if (queued_outgoing_data_streams_.empty()) {
-    QUICHE_BUG(NextQueuedOutgoingDataStream_no_stream)
-        << "NextQueuedOutgoingDataStream called when there are no streams "
+    QUICHE_BUG(NextQueuedOutgoingSubgroupStream_no_stream)
+        << "NextQueuedOutgoingSubgroupStream called when there are no streams "
            "pending.";
     return NewStreamParameters(0, 0, 0, 0);
   }
@@ -2294,7 +2298,7 @@
   stream_map().AddStream(start_sequence, id);
 }
 void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
-    webtransport::StreamId id, DataStreamIndex end_sequence) {
+    DataStreamIndex end_sequence) {
   stream_map().RemoveStream(end_sequence);
 }
 
@@ -2307,200 +2311,6 @@
   // TODO: send PUBLISH_DONE if the subscription is done.
 }
 
-MoqtSession::OutgoingDataStream::OutgoingDataStream(
-    MoqtSession* session, webtransport::Stream* stream,
-    PublishedSubscription& subscription, const NewStreamParameters& parameters)
-    : session_(session),
-      stream_(stream),
-      subscription_id_(subscription.request_id()),
-      index_(parameters.index),
-      publisher_priority_(parameters.publisher_priority.value_or(
-          subscription.default_publisher_priority())),
-      // Always include extension header length, because it's difficult to know
-      // a priori if they're going to appear on a stream.
-      stream_type_(MoqtDataStreamType::Subgroup(
-          index_.subgroup, parameters.first_object, false,
-          !parameters.publisher_priority.has_value())),
-      next_object_(parameters.first_object),
-      session_liveness_(session->liveness_token_) {
-  UpdateSendOrder(subscription);
-  session->trace_recorder_.RecordSubgroupStreamCreated(
-      stream->GetStreamId(), subscription.track_alias(), parameters.index);
-}
-
-MoqtSession::OutgoingDataStream::~OutgoingDataStream() {
-  // Though it might seem intuitive that the session object has to outlive the
-  // connection object (and this is indeed how something like QuicSession and
-  // QuicStream works), this is not the true for WebTransport visitors: the
-  // session getting destroyed will inevitably lead to all related streams being
-  // destroyed, but the actual order of destruction is not guaranteed.  Thus, we
-  // need to check if the session still exists while accessing it in a stream
-  // destructor.
-  if (session_liveness_.expired()) {
-    return;
-  }
-  if (delivery_timeout_alarm_ != nullptr) {
-    delivery_timeout_alarm_->PermanentCancel();
-  }
-  auto it = session_->published_subscriptions_.find(subscription_id_);
-  if (it != session_->published_subscriptions_.end()) {
-    it->second->OnDataStreamDestroyed(stream_->GetStreamId(), index_);
-  }
-}
-
-void MoqtSession::OutgoingDataStream::OnCanWrite() {
-  PublishedSubscription* subscription = GetSubscriptionIfValid();
-  if (subscription == nullptr) {
-    return;
-  }
-  SendObjects(*subscription);
-}
-
-void MoqtSession::OutgoingDataStream::OnStopSendingReceived(
-    webtransport::StreamErrorCode error_code) {
-  PublishedSubscription* subscription = GetSubscriptionIfValid();
-  if (subscription == nullptr) {
-    return;
-  }
-  subscription->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code);
-}
-
-void MoqtSession::OutgoingDataStream::DeliveryTimeoutDelegate::OnAlarm() {
-  auto it = stream_->session_->published_subscriptions_.find(
-      stream_->subscription_id_);
-  if (it != stream_->session_->published_subscriptions_.end()) {
-    it->second->OnStreamTimeout(stream_->index());
-  }
-  stream_->stream_->ResetWithUserCode(kResetCodeDeliveryTimeout);
-}
-
-MoqtSession::PublishedSubscription*
-MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() {
-  auto it = session_->published_subscriptions_.find(subscription_id_);
-  if (it == session_->published_subscriptions_.end()) {
-    stream_->ResetWithUserCode(kResetCodeCancelled);
-    return nullptr;
-  }
-
-  PublishedSubscription* subscription = it->second.get();
-  if (!subscription->publisher().largest_location().has_value()) {
-    QUICHE_BUG(GetSubscriptionIfValid_InvalidTrackStatusOk)
-        << "The track publisher returned a status indicating that no objects "
-           "are available, but a stream for those objects exists.";
-    session_->Error(MoqtError::kInternalError,
-                    "Invalid track state provided by application");
-    return nullptr;
-  }
-  return subscription;
-}
-
-void MoqtSession::OutgoingDataStream::SendObjects(
-    PublishedSubscription& subscription) {
-  while (stream_->CanWrite()) {
-    std::optional<PublishedObject> object =
-        subscription.publisher().GetCachedObject(
-            index_.group, index_.subgroup, next_object_, already_delivered_);
-    if (!object.has_value()) {
-      break;
-    }
-    if (object->metadata.payload_length > 0 && object->payload.empty()) {
-      QUICHE_BUG(OutgoingDataStream_empty_payload)
-          << "Received non-empty object with no payload";
-      return;
-    }
-
-    QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
-    QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
-    if (!subscription.InWindow(object->metadata.location)) {
-      // It is possible that the next object became irrelevant due to a
-      // REQUEST_UPDATE.  Close the stream if so.
-      bool success = stream_->SendFin();
-      QUICHE_BUG_IF(OutgoingDataStream_fin_due_to_update, !success)
-          << "Writing FIN failed despite CanWrite() being true.";
-      return;
-    }
-
-    quic::QuicTimeDelta delivery_timeout = subscription.delivery_timeout();
-    if (!session_->alternate_delivery_timeout_ &&
-        session_->callbacks_.clock->ApproximateNow() -
-                object->metadata.arrival_time >
-            delivery_timeout) {
-      subscription.OnStreamTimeout(index_);
-      stream_->ResetWithUserCode(kResetCodeDeliveryTimeout);
-      return;
-    }
-    uint64_t start_offset = already_delivered_;
-    already_delivered_ +=
-        quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload));
-    bool fin_after_this = object->fin_after_this &&
-                          already_delivered_ == object->metadata.payload_length;
-    if (start_offset > 0) {  // Just send payload.
-      if (already_delivered_ == start_offset) {
-        // Partial delivery of an object but the payload is empty. This would
-        // result in an infinite loop.
-        QUICHE_BUG(OutgoingDataStream_empty_payload)
-            << "Empty payload for partial object " << object->metadata.location;
-        return;
-      }
-      webtransport::StreamWriteOptions options;
-      options.set_send_fin(fin_after_this);
-      absl::Status write_status =
-          stream_->Writev(absl::MakeSpan(object->payload), options);
-      if (!write_status.ok()) {
-        QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
-            << "Writing into MoQT stream failed despite CanWrite() being true "
-               "before; status: "
-            << write_status;
-        session_->Error(MoqtError::kInternalError, "Data stream write error");
-        return;
-      }
-    } else {
-      if (!session_->WriteObjectToStream(
-              stream_, subscription.track_alias(), object->metadata,
-              std::move(object->payload), stream_type_, last_object_,
-              fin_after_this)) {
-        // WriteObjectToStream() closes the connection on error, meaning that
-        // there is no need to process the stream any further.
-        return;
-      }
-      last_object_ = object->metadata;
-      subscription.OnObjectSent(object->metadata.location);
-      next_object_ = last_object_->location.object;
-    }
-    if (already_delivered_ == last_object_->payload_length) {
-      ++next_object_;
-      already_delivered_ = 0;
-    } else {
-      return;
-    }
-    if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
-        !session_->alternate_delivery_timeout_) {
-      CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
-    }
-  }
-}
-
-void MoqtSession::OutgoingDataStream::Fin(Location last_object) {
-  QUICHE_DCHECK_EQ(last_object.group, index_.group);
-  if (next_object_ <= last_object.object) {
-    // There is still data to send, do nothing.
-    return;
-  }
-  // All data has already been sent; send a pure FIN.
-  bool success = stream_->SendFin();
-  QUICHE_BUG_IF(OutgoingDataStream_fin_failed, !success)
-      << "Writing pure FIN failed.";
-  auto it = session_->published_subscriptions_.find(subscription_id_);
-  if (it == session_->published_subscriptions_.end()) {
-    return;
-  }
-  quic::QuicTimeDelta delivery_timeout = it->second->delivery_timeout();
-  if (!delivery_timeout.IsInfinite()) {
-    CreateAndSetAlarm(session_->callbacks_.clock->ApproximateNow() +
-                      delivery_timeout);
-  }
-}
-
 bool MoqtSession::WriteObjectToStream(
     webtransport::Stream* stream, uint64_t id,
     const PublishedObjectMetadata& metadata,
@@ -2635,24 +2445,6 @@
   OnObjectSent(object->metadata.location);
 }
 
-void MoqtSession::OutgoingDataStream::UpdateSendOrder(
-    PublishedSubscription& subscription) {
-  stream_->SetPriority(webtransport::StreamPriority{
-      /*send_group_id=*/kMoqtSendGroupId,
-      subscription.GetSendOrder(Location(index_.group, next_object_),
-                                index_.subgroup, publisher_priority_)});
-}
-
-void MoqtSession::OutgoingDataStream::CreateAndSetAlarm(
-    quic::QuicTime deadline) {
-  if (delivery_timeout_alarm_ != nullptr) {
-    return;
-  }
-  delivery_timeout_alarm_ = absl::WrapUnique(
-      session_->alarm_factory_->CreateAlarm(new DeliveryTimeoutDelegate(this)));
-  delivery_timeout_alarm_->Set(deadline);
-}
-
 MoqtSession::PublishedFetch::FetchStreamVisitor::FetchStreamVisitor(
     std::shared_ptr<PublishedFetch> fetch, webtransport::Stream* stream)
     : fetch_(fetch), stream_(stream) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index cb00379..f927a8b 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -41,6 +41,7 @@
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/moqt_uni_stream.h"
 #include "quiche/quic/moqt/session_namespace_tree.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/platform/api/quiche_logging.h"
@@ -367,7 +368,8 @@
   };
   // Represents a record for a single subscription to a local track that is
   // being sent to the peer.
-  class PublishedSubscription : public MoqtObjectListener {
+  class PublishedSubscription : public MoqtObjectListener,
+                                public SubscriptionPublisherInterface {
    public:
     PublishedSubscription(MoqtSession* session,
                           std::shared_ptr<MoqtTrackPublisher> track_publisher,
@@ -383,6 +385,9 @@
 
     uint64_t request_id() const { return request_id_; }
     MoqtTrackPublisher& publisher() { return *track_publisher_; }
+    std::shared_ptr<MoqtTrackPublisher> publisher_shared_ptr() {
+      return track_publisher_;
+    }
     uint64_t track_alias() const { return track_alias_; }
     MessageParameters& parameters() { return parameters_; }
     std::optional<Location> largest_sent() const { return largest_sent_; }
@@ -397,21 +402,47 @@
                               MoqtPriority publisher_priority) override;
     void OnTrackPublisherGone() override;
     void OnNewFinAvailable(Location location, uint64_t subgroup) override;
+    // also a part of SubscriptionPublisherInterface.
     void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup,
                              webtransport::StreamErrorCode error_code) override;
     void OnGroupAbandoned(uint64_t group_id) override;
     void ProcessObjectAck(const MoqtObjectAck& message);
 
-    // Updates the window and other properties of the subscription in question.
-    void Update(const MessageParameters& parameters);
-    // Checks if a given Location or Group should be forwarded to the
-    // subscriber.
-    bool InWindow(Location location) {
+    // SubscriptionPublisherInterface implementation.
+    bool InWindow(Location location) override {
       return parameters_.forward() &&
              (!parameters_.subscription_filter.has_value() ||
               (parameters_.subscription_filter->WindowKnown() &&
                parameters_.subscription_filter->InWindow(location)));
+    };
+    bool alternate_delivery_timeout() override {
+      return session_->alternate_delivery_timeout_;
     }
+    const quic::QuicClock* clock() override {
+      return session_->callbacks_.clock;
+    }
+    quic::QuicTimeDelta delivery_timeout() override {
+      return std::min(
+          parameters_.delivery_timeout.value_or(kDefaultDeliveryTimeout),
+          publisher_delivery_timeout_.value_or(kDefaultDeliveryTimeout));
+    }
+    quic::QuicAlarmFactory* alarm_factory() override {
+      return session_->alarm_factory_.get();
+    }
+    void OnObjectSent(Location sequence) override;
+    void OnStreamTimeout(DataStreamIndex index) override {
+      reset_subgroups_.insert(index);
+      if (session_->alternate_delivery_timeout_) {
+        first_active_group_ = std::max(first_active_group_, index.group + 1);
+      }
+    }
+    // OnSubgroupAbandoned() is declared above with MoqtObjectListener.
+    void OnDataStreamDestroyed(DataStreamIndex) override;
+
+    // Updates the window and other properties of the subscription in question.
+    void Update(const MessageParameters& parameters);
+    // Checks if a given Location or Group should be forwarded to the
+    // subscriber.
     bool InWindow(uint64_t group) {
       return parameters_.forward() &&
              (!parameters_.subscription_filter.has_value() ||
@@ -421,9 +452,6 @@
 
     void OnDataStreamCreated(webtransport::StreamId id,
                              DataStreamIndex start_sequence);
-    void OnDataStreamDestroyed(webtransport::StreamId id,
-                               DataStreamIndex end_sequence);
-    void OnObjectSent(Location sequence);
 
     std::vector<webtransport::StreamId> GetAllStreams() const;
 
@@ -432,18 +460,13 @@
                                          std::optional<uint64_t> subgroup,
                                          MoqtPriority publisher_priority) const;
 
-    void AddQueuedOutgoingDataStream(const NewStreamParameters& parameters);
+    void AddQueuedOutgoingSubgroupStream(const NewStreamParameters& parameters);
     // Pops the pending outgoing data stream, with the highest send order.
     // The session keeps track of which subscribes have pending streams. This
     // function will trigger a QUICHE_DCHECK if called when there are no pending
     // streams.
-    NewStreamParameters NextQueuedOutgoingDataStream();
+    NewStreamParameters NextQueuedOutgoingSubgroupStream();
 
-    quic::QuicTimeDelta delivery_timeout() const {
-      return std::min(
-          parameters_.delivery_timeout.value_or(kDefaultDeliveryTimeout),
-          publisher_delivery_timeout_.value_or(kDefaultDeliveryTimeout));
-    }
     void set_subscriber_delivery_timeout(quic::QuicTimeDelta timeout) {
       parameters_.delivery_timeout = timeout;
     }
@@ -451,13 +474,6 @@
       publisher_delivery_timeout_ = timeout;
     }
 
-    void OnStreamTimeout(DataStreamIndex index) {
-      reset_subgroups_.insert(index);
-      if (session_->alternate_delivery_timeout_) {
-        first_active_group_ = std::max(first_active_group_, index.group + 1);
-      }
-    }
-
     uint64_t first_active_group() const { return first_active_group_; }
 
     absl::flat_hash_set<DataStreamIndex>& reset_subgroups() {
@@ -474,6 +490,10 @@
 
     bool established() const { return established_; }
 
+    quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() {
+      return weak_ptr_factory_.Create();
+    }
+
    private:
     friend class test::MoqtSessionPeer;
     SendStreamMap& stream_map();
@@ -521,82 +541,9 @@
     // FinalizeSendOrder() whenever delivering it to the MoqtSession.
     absl::btree_multimap<webtransport::SendOrder, NewStreamParameters>
         queued_outgoing_data_streams_;
-  };
-  class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
-   public:
-    OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
-                       PublishedSubscription& subscription,
-                       const NewStreamParameters& parameters);
-    ~OutgoingDataStream();
-
-    // webtransport::StreamVisitor implementation.
-    void OnCanRead() override {}
-    void OnCanWrite() override;
-    void OnResetStreamReceived(webtransport::StreamErrorCode) override {}
-    void OnStopSendingReceived(
-        webtransport::StreamErrorCode error_code) override;
-    void OnWriteSideInDataRecvdState() override {}
-
-    class DeliveryTimeoutDelegate
-        : public quic::QuicAlarm::DelegateWithoutContext {
-     public:
-      explicit DeliveryTimeoutDelegate(OutgoingDataStream* stream)
-          : stream_(stream) {}
-      void OnAlarm() override;
-
-     private:
-      OutgoingDataStream* stream_;
-    };
-
-    webtransport::Stream* stream() const { return stream_; }
-
-    // Sends objects on the stream, starting with `next_object_`, until the
-    // stream becomes write-blocked or closed.
-    void SendObjects(PublishedSubscription& subscription);
-
-    // Sends a pure FIN on the stream, if the last object sent matches
-    // |last_object|. Otherwise, does nothing.
-    void Fin(Location last_object);
-
-    // Recomputes the send order and updates it for the associated stream.
-    void UpdateSendOrder(PublishedSubscription& subscription);
-
-    // Creates and sets an alarm for the given deadline. Does nothing if the
-    // alarm is already created.
-    void CreateAndSetAlarm(quic::QuicTime deadline);
-
-    DataStreamIndex index() const { return index_; }
-
-   private:
-    friend class test::MoqtSessionPeer;
-    friend class DeliveryTimeoutDelegate;
-
-    // Checks whether the associated subscription is still valid; if not, resets
-    // the stream and returns nullptr.
-    PublishedSubscription* GetSubscriptionIfValid();
-
-    MoqtSession* session_;
-    webtransport::Stream* stream_;
-    uint64_t subscription_id_;
-    DataStreamIndex index_;
-    const MoqtPriority publisher_priority_;
-    MoqtDataStreamType stream_type_;
-    // Minimum object ID that should go out next. The session doesn't know the
-    // exact ID of the next object in the stream because the next object could
-    // be in a different subgroup or simply be skipped.
-    uint64_t next_object_;
-    // Number of payload bytes from next_object_ that has already been written
-    // to the stream.
-    uint64_t already_delivered_ = 0;
-    // Used in subgroup streams to compute the object ID diff. If nullopt, the
-    // stream header has not been written yet.
-    std::optional<PublishedObjectMetadata> last_object_;
-    // If this data stream is for SUBSCRIBE, reset it if an object has been
-    // excessively delayed per Section 7.1.1.2.
-    std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_;
-    // A weak pointer to an object owned by the session.  Used to make sure the
-    // session does not get called after being destroyed.
-    std::weak_ptr<void> session_liveness_;
+    // Must be last.
+    quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface>
+        weak_ptr_factory_;
   };
 
   class QUICHE_EXPORT PublishedFetch {
@@ -668,15 +615,22 @@
       MessageParameters parameters;
       parameters.expires = publisher_->expiration();
       parameters.largest_object = publisher_->largest_location();
-      session_->GetControlStream()->SendRequestOk(request_id_, parameters);
+      MoqtBidiStreamBase* control_stream = session_->GetControlStream();
+      if (control_stream != nullptr) {
+        control_stream->CheckStatus(
+            control_stream->SendRequestOk(request_id_, parameters));
+      }
       session_->incoming_track_status_.erase(request_id_);
       // No class access below this line!
     }
 
     void OnSubscribeRejected(MoqtRequestErrorInfo info) override {
-      session_->GetControlStream()->SendRequestError(
-          request_id_, info.error_code, info.retry_interval,
-          info.reason_phrase);
+      MoqtBidiStreamBase* control_stream = session_->GetControlStream();
+      if (control_stream != nullptr) {
+        control_stream->CheckStatus(control_stream->SendRequestError(
+            request_id_, info.error_code, info.retry_interval,
+            info.reason_phrase));
+      }
       session_->incoming_track_status_.erase(request_id_);
       // No class access below this line!
     }
@@ -756,6 +710,7 @@
   // a session error if is not.
   bool ValidateRequestId(uint64_t request_id);
 
+  // TODO(martinduke): Delete once Fetch uses OutgoingSubgroupStream.
   // Actually sends an object on |stream| with track alias or fetch ID |id|
   // and metadata in |object|. Not for use with datagrams. Returns |true| if
   // the write was successful.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index e8f27fe..08d0a61 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -50,6 +50,7 @@
 #include "quiche/common/quiche_data_reader.h"
 #include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/quiche_weak_ptr.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
 #include "quiche/web_transport/test_tools/in_memory_stream.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
@@ -1302,7 +1303,7 @@
   control_stream->ReceiveMessage(subscribe_ok);
 }
 
-TEST_F(MoqtSessionTest, CreateOutgoingDataStreamAndSend) {
+TEST_F(MoqtSessionTest, CreateOutgoingSubgroupStreamAndSend) {
   FullTrackName ftn("foo", "bar");
   auto track =
       SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
@@ -1917,6 +1918,11 @@
   EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] {
     return stream_visitor.get();
   });
+  webtransport::StreamPriority expected_priority{
+      kMoqtSendGroupId,
+      SendOrderForStream(kDefaultSubscriberPriority, kDefaultPublisherPriority,
+                         5, 0, MoqtDeliveryOrder::kAscending)};
+  EXPECT_CALL(mock_stream_, SetPriority(expected_priority));
   EXPECT_CALL(mock_stream_, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
@@ -2173,7 +2179,8 @@
   std::make_shared<MockTrackPublisher>(request.full_track_name);
   TrackExtensions extensions(std::nullopt, std::nullopt, kLocalDefaultPriority,
                              std::nullopt, std::nullopt, std::nullopt);
-  EXPECT_CALL(*track, extensions).WillOnce(testing::ReturnRef(extensions));
+  EXPECT_CALL(*track, extensions)
+      .WillRepeatedly(testing::ReturnRef(extensions));
   MoqtObjectListener* listener = ReceiveSubscribeSynchronousOk(
       track, request, control_stream.get(), /*track_alias=*/0, extensions);
 
diff --git a/quiche/quic/moqt/moqt_uni_stream.cc b/quiche/quic/moqt/moqt_uni_stream.cc
new file mode 100644
index 0000000..e0b9c37
--- /dev/null
+++ b/quiche/quic/moqt/moqt_uni_stream.cc
@@ -0,0 +1,249 @@
+// Copyright 2026 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_uni_stream.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "absl/base/nullability.h"
+#include "absl/memory/memory.h"
+#include "absl/status/status.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/core/quic_utils.h"
+#include "quiche/quic/moqt/moqt_error.h"
+#include "quiche/quic/moqt/moqt_framer.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_mem_slice.h"
+#include "quiche/common/quiche_weak_ptr.h"
+#include "quiche/web_transport/stream_helpers.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+OutgoingSubgroupStream::OutgoingSubgroupStream(
+    MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
+    DataStreamIndex index, uint64_t first_object,
+    quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor,
+    std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher,
+    webtransport::StreamPriority priority, uint64_t track_alias,
+    MoqtTraceRecorder* absl_nonnull trace_recorder)
+    : stream_(*stream),
+      index_(index),
+      visitor_(std::move(visitor)),
+      framer_(framer),
+      track_alias_(track_alias),
+      publisher_(track_publisher),
+      next_object_(first_object),
+      priority_(priority) {
+  stream_.SetPriority(priority_);
+  trace_recorder->RecordSubgroupStreamCreated(stream->GetStreamId(),
+                                              track_alias_, index);
+}
+
+OutgoingSubgroupStream::~OutgoingSubgroupStream() {
+  // Though it might seem intuitive that the session object has to outlive the
+  // connection object (and this is indeed how something like QuicSession and
+  // QuicStream works), this is not the true for WebTransport visitors: the
+  // session getting destroyed will inevitably lead to all related streams being
+  // destroyed, but the actual order of destruction is not guaranteed.  Thus, we
+  // need to check if the session still exists while accessing it in a stream
+  // destructor.
+  if (delivery_timeout_alarm_ != nullptr) {
+    delivery_timeout_alarm_->PermanentCancel();
+  }
+  SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
+  if (visitor != nullptr) {
+    visitor->OnDataStreamDestroyed(index_);
+  }
+}
+
+void OutgoingSubgroupStream::OnCanWrite() { SendObjects(); }
+
+void OutgoingSubgroupStream::OnStopSendingReceived(
+    webtransport::StreamErrorCode error_code) {
+  SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
+  if (visitor != nullptr) {
+    visitor->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code);
+  }
+}
+
+void OutgoingSubgroupStream::DeliveryTimeoutDelegate::OnAlarm() {
+  SubscriptionPublisherInterface* visitor = stream_->visitor_.GetIfAvailable();
+  if (visitor != nullptr) {
+    visitor->OnStreamTimeout(stream_->index_);
+  }
+  stream_->stream_.ResetWithUserCode(kResetCodeDeliveryTimeout);
+}
+
+void OutgoingSubgroupStream::SendObjects() {
+  SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
+  if (visitor == nullptr) {
+    return;
+  }
+  while (stream_.CanWrite()) {
+    std::optional<PublishedObject> object = publisher_->GetCachedObject(
+        index_.group, index_.subgroup, next_object_, already_delivered_);
+    if (!object.has_value()) {
+      break;
+    }
+    if (object->metadata.payload_length > 0 && object->payload.empty()) {
+      QUICHE_BUG(OutgoingSubgroupStream_empty_payload)
+          << "Received non-empty object with no payload";
+      return;
+    }
+    QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
+    QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
+    if (!visitor->InWindow(object->metadata.location)) {
+      // It is possible that the next object became irrelevant due to a
+      // REQUEST_UPDATE.  Close the stream if so.
+      absl::Status status = webtransport::SendFinOnStream(stream_);
+      QUICHE_BUG_IF(OutgoingSubgroupStream_fin_due_to_update, !status.ok())
+          << "Writing FIN failed despite CanWrite() being true.";
+      return;
+    }
+
+    quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
+    if (!visitor->alternate_delivery_timeout() &&
+        visitor->clock()->ApproximateNow() - object->metadata.arrival_time >
+            delivery_timeout) {
+      visitor->OnStreamTimeout(index_);
+      stream_.ResetWithUserCode(kResetCodeDeliveryTimeout);
+      // No class access below this line.
+      return;
+    }
+    uint64_t start_offset = already_delivered_;
+    already_delivered_ +=
+        quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload));
+    object->fin_after_this &=
+        already_delivered_ == object->metadata.payload_length;
+    if (start_offset > 0) {  // Just send payload.
+      if (already_delivered_ == start_offset) {
+        // Partial delivery of an object but the payload is empty. This would
+        // result in an infinite loop.
+        QUICHE_BUG(OutgoingDataStream_empty_payload)
+            << "Empty payload for partial object " << object->metadata.location;
+        return;
+      }
+      webtransport::StreamWriteOptions options;
+      options.set_send_fin(object->fin_after_this);
+      absl::Status write_status =
+          stream_.Writev(absl::MakeSpan(object->payload), options);
+      if (!write_status.ok()) {
+        QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
+            << "Writing into MoQT stream failed despite CanWrite() being true "
+               "before; status: "
+            << write_status;
+        stream_.ResetWithUserCode(kResetCodeInternalError);
+        return;
+      }
+    } else {
+      if (!WriteObjectToStream(*object)) {
+        stream_.ResetWithUserCode(kResetCodeInternalError);
+        // No class access below this line.
+        return;
+      }
+      last_object_ = object->metadata;
+      next_object_ = last_object_->location.object;
+      visitor->OnObjectSent(object->metadata.location);
+    }
+    if (already_delivered_ != last_object_->payload_length) {
+      return;
+    }
+    ++next_object_;
+    already_delivered_ = 0;
+    if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
+        !visitor->alternate_delivery_timeout()) {
+      CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
+    }
+  }
+}
+
+void OutgoingSubgroupStream::Fin(Location last_object) {
+  QUICHE_DCHECK_EQ(last_object.group, index_.group);
+  if (next_object_ <= last_object.object) {
+    // There is still data to send, do nothing.
+    return;
+  }
+  // All data has already been sent; send a pure FIN.
+  absl::Status status = webtransport::SendFinOnStream(stream_);
+  QUICHE_BUG_IF(OutgoingSubgroupStream_fin_failed, !status.ok())
+      << "Writing pure FIN failed.";
+  SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
+  if (visitor == nullptr) {
+    return;
+  }
+  quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
+  if (!delivery_timeout.IsInfinite()) {
+    CreateAndSetAlarm(visitor->clock()->ApproximateNow() + delivery_timeout);
+  }
+}
+
+void OutgoingSubgroupStream::CreateAndSetAlarm(quic::QuicTime deadline) {
+  if (delivery_timeout_alarm_ != nullptr) {
+    return;
+  }
+  SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
+  if (visitor == nullptr) {
+    return;
+  }
+  delivery_timeout_alarm_ = absl::WrapUnique(
+      visitor->alarm_factory()->CreateAlarm(new DeliveryTimeoutDelegate(this)));
+  delivery_timeout_alarm_->Set(deadline);
+}
+
+bool OutgoingSubgroupStream::WriteObjectToStream(PublishedObject& object) {
+  MoqtObject header;
+  header.track_alias = track_alias_;
+  header.group_id = object.metadata.location.group;
+  header.subgroup_id = object.metadata.subgroup;
+  header.object_id = object.metadata.location.object;
+  header.publisher_priority = object.metadata.publisher_priority;
+  header.extension_headers = object.metadata.extensions;
+  header.object_status = object.metadata.status;
+  header.payload_length = object.metadata.payload_length;
+
+  // Always include extension header length, because it's difficult to know
+  // a priori if they're going to appear on a stream.
+  if (!last_object_.has_value()) {
+    type_ = MoqtDataStreamType::Subgroup(
+        index_.subgroup, next_object_, false,
+        object.metadata.publisher_priority ==
+            publisher_->extensions().default_publisher_priority());
+  }
+  quiche::QuicheBuffer serialized_header =
+      framer_.SerializeObjectHeader(header, type_, last_object_);
+  std::vector<quiche::QuicheMemSlice> write_vector;
+  write_vector.reserve(object.payload.size() + 1);
+  write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header)));
+  for (auto& slice : object.payload) {
+    write_vector.push_back(std::move(slice));
+  }
+  webtransport::StreamWriteOptions options;
+  options.set_send_fin(object.fin_after_this);
+  absl::Status write_status =
+      stream_.Writev(absl::MakeSpan(write_vector), options);
+  if (!write_status.ok()) {
+    QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
+        << "Writing into MoQT stream failed despite CanWrite being true "
+           "before; status: "
+        << write_status;
+    return false;
+  }
+  QUICHE_DVLOG(1) << "Stream " << stream_.GetStreamId()
+                  << " successfully wrote " << object.metadata.location
+                  << ", fin = " << object.fin_after_this;
+  return true;
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h
new file mode 100644
index 0000000..c8b94a0
--- /dev/null
+++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -0,0 +1,139 @@
+// Copyright 2026 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
+#define QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+
+#include "absl/base/nullability.h"
+#include "quiche/quic/core/quic_alarm.h"
+#include "quiche/quic/core/quic_alarm_factory.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_framer.h"
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_weak_ptr.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+namespace test {
+class MoqtSessionPeer;
+}
+
+// This interface provides information about the subscription.
+class SubscriptionPublisherInterface {
+ public:
+  virtual ~SubscriptionPublisherInterface() = default;
+  virtual bool InWindow(Location) = 0;
+  virtual bool alternate_delivery_timeout() = 0;
+  virtual const quic::QuicClock* clock() = 0;
+  virtual quic::QuicTimeDelta delivery_timeout() = 0;
+  virtual quic::QuicAlarmFactory* alarm_factory() = 0;
+  // Called when the first byte of an object is written to the stream.
+  virtual void OnObjectSent(Location) = 0;
+  virtual void OnStreamTimeout(DataStreamIndex) = 0;
+  virtual void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup,
+                                   webtransport::StreamErrorCode) = 0;
+  virtual void OnDataStreamDestroyed(DataStreamIndex) = 0;
+};
+
+// This is for subscriptions only. FETCH uses its own construct.
+class QUICHE_EXPORT OutgoingSubgroupStream
+    : public webtransport::StreamVisitor {
+ public:
+  // |visitor| is owned by the subscription, so the WeakPtr also serves as a
+  // liveness token.
+  OutgoingSubgroupStream(
+      MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
+      DataStreamIndex index, uint64_t first_object,
+      quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor,
+      std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher,
+      webtransport::StreamPriority priority, uint64_t track_alias,
+      MoqtTraceRecorder* absl_nonnull trace_recorder);
+  ~OutgoingSubgroupStream();
+
+  // webtransport::StreamVisitor implementation.
+  void OnCanRead() override {}
+  void OnCanWrite() override;
+  void OnResetStreamReceived(webtransport::StreamErrorCode) override {}
+  void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override;
+  void OnWriteSideInDataRecvdState() override {}
+
+  class DeliveryTimeoutDelegate
+      : public quic::QuicAlarm::DelegateWithoutContext {
+   public:
+    explicit DeliveryTimeoutDelegate(OutgoingSubgroupStream* stream)
+        : stream_(stream) {}
+    void OnAlarm() override;
+
+   private:
+    OutgoingSubgroupStream* stream_;
+  };
+
+  // Sends objects on the stream, starting with `next_object_`, until the
+  // stream becomes write-blocked or closed. Can reset the stream, destroying
+  // the class, on a write error.
+  void SendObjects();
+
+  // Sends a pure FIN on the stream, if the last object sent matches
+  // |last_object|. Otherwise, does nothing.
+  void Fin(Location last_object);
+  // Reset can be called directly on the stream, with no need to involve the
+  // visitor.
+
+  // Recomputes the send order and updates it for the associated stream.
+  void UpdatePriority(MoqtPriority subscriber_priority) {
+    priority_.send_order = UpdateSendOrderForSubscriberPriority(
+        priority_.send_order, subscriber_priority);
+    stream_.SetPriority(priority_);
+  }
+
+  // Creates and sets an alarm for the given deadline. Does nothing if the
+  // alarm is already created.
+  void CreateAndSetAlarm(quic::QuicTime deadline);
+
+ private:
+  friend class DeliveryTimeoutDelegate;
+  friend class test::MoqtSessionPeer;
+
+  // Writes an object to the stream. Returns false if the write failed. The
+  // caller should reset the stream if that happens.
+  bool WriteObjectToStream(PublishedObject& object);
+
+  webtransport::Stream& stream_;  // Always valid because it owns this object.
+  DataStreamIndex index_;
+  quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor_;
+  MoqtFramer framer_;
+  MoqtDataStreamType type_;
+  uint64_t track_alias_;
+  std::shared_ptr<MoqtTrackPublisher> publisher_;
+  // Minimum object ID that should go out next. The session doesn't know the
+  // exact ID of the next object in the stream because the next object could
+  // be in a different subgroup or simply be skipped.
+  uint64_t next_object_;
+  // Number of payload bytes from next_object_ that has already been written
+  // to the stream.
+  uint64_t already_delivered_ = 0;
+  // Used in subgroup streams to compute the object ID diff and pass metadata
+  // for partial objects. If nullopt, the stream header has not been written
+  // yet.
+  std::optional<PublishedObjectMetadata> last_object_;
+  webtransport::StreamPriority priority_;
+  // If this data stream is for SUBSCRIBE, reset it if an object has been
+  // excessively delayed per Section 7.1.1.2.
+  std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_;
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc
new file mode 100644
index 0000000..7e83a32
--- /dev/null
+++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -0,0 +1,218 @@
+// Copyright 2026 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_uni_stream.h"
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+
+#include "absl/status/status.h"
+#include "absl/strings/string_view.h"
+#include "absl/types/span.h"
+#include "quiche/quic/core/quic_alarm_factory.h"
+#include "quiche/quic/core/quic_time.h"
+#include "quiche/quic/moqt/moqt_error.h"
+#include "quiche/quic/moqt/moqt_framer.h"
+#include "quiche/quic/moqt/moqt_key_value_pair.h"
+#include "quiche/quic/moqt/moqt_names.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_trace_recorder.h"
+#include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
+#include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/quic/test_tools/mock_clock.h"
+#include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/quiche_mem_slice.h"
+#include "quiche/common/quiche_weak_ptr.h"
+#include "quiche/web_transport/test_tools/mock_web_transport.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt::test {
+
+namespace {
+
+using ::testing::Optional;
+using ::testing::Return;
+using ::testing::ReturnRef;
+using ::testing::StrictMock;
+
+PublishedObject DefaultObject() {
+  PublishedObject object;
+  object.metadata.location = Location(0, 0);
+  object.metadata.subgroup = 0;
+  object.metadata.status = MoqtObjectStatus::kNormal;
+  object.metadata.arrival_time = quic::QuicTime::Zero();
+  object.metadata.payload_length = 7;
+  object.payload.push_back(quiche::QuicheMemSlice::Copy("payload"));
+  object.fin_after_this = false;
+  return object;
+}
+
+class MockSubscriptionPublisherInterface
+    : public SubscriptionPublisherInterface {
+ public:
+  MockSubscriptionPublisherInterface() : weak_ptr_factory_(this) {}
+
+  MOCK_METHOD(bool, InWindow, (Location), (override));
+  MOCK_METHOD(bool, alternate_delivery_timeout, (), (override));
+  MOCK_METHOD(quic::QuicClock*, clock, (), (override));
+  MOCK_METHOD(quic::QuicTimeDelta, delivery_timeout, (), (override));
+  MOCK_METHOD(quic::QuicAlarmFactory*, alarm_factory, (), (override));
+  MOCK_METHOD(void, OnObjectSent, (Location), (override));
+  MOCK_METHOD(void, OnStreamTimeout, (DataStreamIndex), (override));
+  MOCK_METHOD(void, OnSubgroupAbandoned,
+              (uint64_t, uint64_t, webtransport::StreamErrorCode), (override));
+  MOCK_METHOD(void, OnDataStreamDestroyed, (DataStreamIndex), (override));
+
+  quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() {
+    return weak_ptr_factory_.Create();
+  }
+
+ private:
+  quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface>
+      weak_ptr_factory_;
+};
+
+class OutgoingSubgroupStreamTest : public quic::test::QuicTest {
+ public:
+  OutgoingSubgroupStreamTest()
+      : index_(0, 0),
+        track_publisher_(std::make_shared<StrictMock<MockTrackPublisher>>(
+            FullTrackName("foo", "bar"))),
+        trace_recorder_(nullptr) {
+    EXPECT_CALL(mock_stream_, GetStreamId()).WillRepeatedly(Return(14));
+    CreateStream();
+  }
+  ~OutgoingSubgroupStreamTest() override {
+    EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
+  }
+
+  void CreateStream(uint64_t next_object = 0) {
+    EXPECT_CALL(mock_stream_, SetPriority);
+    stream_ = std::make_unique<OutgoingSubgroupStream>(
+        framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(),
+        track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_);
+  }
+
+  void ExpectFin() {
+    EXPECT_CALL(mock_stream_, Writev)
+        .WillOnce([](absl::Span<quiche::QuicheMemSlice> data,
+                     const webtransport::StreamWriteOptions& options) {
+          EXPECT_TRUE(data.empty());
+          EXPECT_TRUE(options.send_fin());
+          return absl::OkStatus();
+        });
+  }
+
+  void ExpectAlarm() {
+    EXPECT_CALL(visitor_, alarm_factory()).WillOnce(Return(&alarm_factory_));
+  }
+
+  MoqtFramer framer_{true};
+  StrictMock<webtransport::test::MockStream> mock_stream_;
+  DataStreamIndex index_;
+  std::shared_ptr<StrictMock<MockTrackPublisher>> track_publisher_;
+  StrictMock<MockSubscriptionPublisherInterface> visitor_;
+  MoqtTraceRecorder trace_recorder_;
+  TrackExtensions track_extensions_;
+  quic::MockClock mock_clock_;
+  quic::test::MockAlarmFactory alarm_factory_;
+  std::unique_ptr<OutgoingSubgroupStream> stream_;
+};
+
+TEST_F(OutgoingSubgroupStreamTest, OnCanWrite) {
+  EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(false));
+  stream_->OnCanWrite();
+}
+
+TEST_F(OutgoingSubgroupStreamTest, OnStopSendingReceived) {
+  EXPECT_CALL(visitor_, OnSubgroupAbandoned(index_.group, index_.subgroup, 1));
+  stream_->OnStopSendingReceived(1);
+}
+
+TEST_F(OutgoingSubgroupStreamTest, DeliveryTimeoutAlarm) {
+  OutgoingSubgroupStream::DeliveryTimeoutDelegate delegate(stream_.get());
+  EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+  delegate.OnAlarm();
+}
+
+TEST_F(OutgoingSubgroupStreamTest, Fin) {
+  // Replace stream_ with one where next_object_ is 1.
+  EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
+  CreateStream(1);
+  // last_object.object < next_object: sends pure FIN
+  ExpectFin();
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
+  ExpectAlarm();
+  stream_->Fin(Location(0, 0));
+  // last_object.object >= next_object: does nothing
+  stream_->Fin(Location(0, 1));
+}
+
+TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) {
+  EXPECT_CALL(mock_stream_, SetPriority(webtransport::StreamPriority{
+                                0, 0x3fc0000000000000ULL}));
+  stream_->UpdatePriority(0);
+}
+
+TEST_F(OutgoingSubgroupStreamTest, SendFragmentedObject) {
+  PublishedObject obj0 = DefaultObject();
+  obj0.metadata.payload_length = 15;
+  obj0.payload.clear();
+  obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part1"));
+  obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part2"));
+  obj0.fin_after_this = true;
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, alternate_delivery_timeout())
+      .WillRepeatedly(Return(false));
+  EXPECT_CALL(visitor_, clock()).WillRepeatedly(Return(&mock_clock_));
+  EXPECT_CALL(*track_publisher_, extensions())
+      .WillRepeatedly(ReturnRef(track_extensions_));
+  EXPECT_CALL(mock_stream_, Writev)
+      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_EQ(data.size(), 3);
+        EXPECT_EQ(data[1].AsStringView(), "part1");
+        EXPECT_EQ(data[2].AsStringView(), "part2");
+        EXPECT_FALSE(options.send_fin());
+        return absl::OkStatus();
+      });
+  EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
+  stream_->OnCanWrite();
+  PublishedObject obj1 = DefaultObject();
+  obj1.metadata.payload_length = 15;
+  obj1.payload.clear();
+  obj1.payload.push_back(quiche::QuicheMemSlice::Copy("part3"));
+  obj1.fin_after_this = true;
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 10))
+      .WillOnce(Return(std::move(obj1)));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 1, 0))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream_, Writev)
+      .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
+                    const webtransport::StreamWriteOptions& options) {
+        EXPECT_EQ(data.size(), 1);
+        EXPECT_EQ(data[0].AsStringView(), "part3");
+        EXPECT_TRUE(options.send_fin());
+        return absl::OkStatus();
+      });
+  EXPECT_CALL(visitor_, OnObjectSent).Times(0);
+  ExpectAlarm();
+  stream_->OnCanWrite();
+}
+
+}  // namespace
+
+}  // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 3f3a740..6ff7172 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -31,11 +31,11 @@
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_track.h"
 #include "quiche/quic/moqt/moqt_types.h"
+#include "quiche/quic/moqt/moqt_uni_stream.h"
 #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_data_reader.h"
-#include "quiche/common/test_tools/quiche_test_utils.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -258,7 +258,7 @@
   }
 
   static quic::QuicAlarm* GetAlarm(webtransport::StreamVisitor* visitor) {
-    return absl::down_cast<MoqtSession::OutgoingDataStream*>(visitor)
+    return absl::down_cast<OutgoingSubgroupStream*>(visitor)
         ->delivery_timeout_alarm_.get();
   }