Cleanup OutgoingSubgroupStream.

Make SendObjects() private, use OnCanWrite() for public calls.

Update priority of active streams when subscriber_priority changes.

PiperOrigin-RevId: 915664438
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 87e6796..37103af 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1978,27 +1978,34 @@
     const MessageParameters& parameters) {
   // TODO(martinduke): If there are auth tokens, this probably has to go to the
   // application.
+  MoqtPriority old_priority =
+      parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority);
   parameters_.Update(parameters);
   can_have_joining_fetch_ = parameters_.forward();
+  if (parameters.subscriber_priority.has_value()) {  // priority changed.
+    // Reprioritize all active streams.
+    for (const auto stream_id : stream_map().GetAllStreams()) {
+      webtransport::Stream* stream =
+          session_->session_->GetStreamById(stream_id);
+      if (stream == nullptr) {
+        continue;
+      }
+      OutgoingSubgroupStream* outgoing_stream =
+          absl::down_cast<OutgoingSubgroupStream*>(stream->visitor());
+      outgoing_stream->UpdatePriority(
+          parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority));
+    }
+    if (queued_outgoing_data_streams_.empty()) {
+      return;
+    }
+    webtransport::SendOrder old_send_order =
+        UpdateSendOrderForSubscriberPriority(
+            queued_outgoing_data_streams_.rbegin()->first, old_priority);
+    session_->UpdateQueuedSendOrder(request_id_, old_send_order,
+                                    FinalizeSendOrder(old_send_order));
+  }
 }
 
-void MoqtSession::PublishedSubscription::set_subscriber_priority(
-    MoqtPriority priority) {
-  if (priority ==
-      parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority)) {
-    return;
-  }
-  if (queued_outgoing_data_streams_.empty()) {
-    parameters_.subscriber_priority = priority;
-    return;
-  }
-  webtransport::SendOrder old_send_order =
-      FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
-  parameters_.subscriber_priority = priority;
-  session_->UpdateQueuedSendOrder(request_id_, old_send_order,
-                                  FinalizeSendOrder(old_send_order));
-};
-
 void MoqtSession::PublishedSubscription::OnSubscribeAccepted() {
   ControlStream* stream = session_->GetControlStream();
   QUICHE_DCHECK(!established_);
@@ -2116,9 +2123,7 @@
   if (raw_stream == nullptr) {
     return;
   }
-  OutgoingSubgroupStream* stream =
-      absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor());
-  stream->SendObjects();
+  raw_stream->visitor()->OnCanWrite();
 }
 
 void MoqtSession::PublishedSubscription::OnTrackPublisherGone() {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index f927a8b..d66df53 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -391,7 +391,6 @@
     uint64_t track_alias() const { return track_alias_; }
     MessageParameters& parameters() { return parameters_; }
     std::optional<Location> largest_sent() const { return largest_sent_; }
-    void set_subscriber_priority(MoqtPriority priority);
 
     // MoqtObjectListener implementation.
     void OnSubscribeAccepted() override;
diff --git a/quiche/quic/moqt/moqt_stream_map.h b/quiche/quic/moqt/moqt_stream_map.h
index dfb4ef6..f2f40a4 100644
--- a/quiche/quic/moqt/moqt_stream_map.h
+++ b/quiche/quic/moqt/moqt_stream_map.h
@@ -10,7 +10,7 @@
 #include <vector>
 
 #include "absl/container/btree_map.h"
-#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/web_transport/web_transport.h"
 
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h
index c8b94a0..9884b26 100644
--- a/quiche/quic/moqt/moqt_uni_stream.h
+++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -80,11 +80,6 @@
     OutgoingSubgroupStream* stream_;
   };
 
-  // Sends objects on the stream, starting with `next_object_`, until the
-  // stream becomes write-blocked or closed. Can reset the stream, destroying
-  // the class, on a write error.
-  void SendObjects();
-
   // Sends a pure FIN on the stream, if the last object sent matches
   // |last_object|. Otherwise, does nothing.
   void Fin(Location last_object);
@@ -106,6 +101,11 @@
   friend class DeliveryTimeoutDelegate;
   friend class test::MoqtSessionPeer;
 
+  // Sends objects on the stream, starting with `next_object_`, until the
+  // stream becomes write-blocked or closed. Can reset the stream, destroying
+  // the class, on a write error.
+  void SendObjects();
+
   // Writes an object to the stream. Returns false if the write failed. The
   // caller should reset the stream if that happens.
   bool WriteObjectToStream(PublishedObject& object);
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc
index 7e83a32..45f82b4 100644
--- a/quiche/quic/moqt/moqt_uni_stream_test.cc
+++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -26,6 +26,7 @@
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/quic/test_tools/mock_clock.h"
 #include "quiche/quic/test_tools/quic_test_utils.h"
+#include "quiche/common/platform/api/quiche_expect_bug.h"
 #include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/quiche_weak_ptr.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
@@ -141,6 +142,96 @@
   delegate.OnAlarm();
 }
 
+TEST_F(OutgoingSubgroupStreamTest, OnCanWriteCompleteFlow) {
+  PublishedObject obj0 = DefaultObject();
+  EXPECT_CALL(mock_stream_, CanWrite())
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
+  EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
+  EXPECT_CALL(*track_publisher_, extensions())
+      .WillRepeatedly(ReturnRef(track_extensions_));
+  EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
+  stream_->OnCanWrite();
+}
+
+TEST_F(OutgoingSubgroupStreamTest, OnCanWriteNotInWindow) {
+  PublishedObject obj0 = DefaultObject();
+
+  EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(false));
+  ExpectFin();
+  stream_->OnCanWrite();
+}
+
+TEST_F(OutgoingSubgroupStreamTest, OnCanWriteTimeout) {
+  PublishedObject obj0 = DefaultObject();
+  EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
+  mock_clock_.AdvanceTime(quic::QuicTimeDelta::FromSeconds(2));
+  EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_));
+  EXPECT_CALL(visitor_, OnStreamTimeout(index_));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
+  stream_->OnCanWrite();
+}
+
+TEST_F(OutgoingSubgroupStreamTest, OnCanWriteWriteError) {
+  PublishedObject obj0 = DefaultObject();
+  EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false));
+  EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_));
+  EXPECT_CALL(*track_publisher_, extensions())
+      .WillRepeatedly(ReturnRef(track_extensions_));
+  EXPECT_CALL(mock_stream_, Writev)
+      .WillOnce(Return(absl::InternalError("error")));
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeInternalError));
+  EXPECT_QUICHE_BUG(
+      stream_->OnCanWrite(),
+      "Writing into MoQT stream failed despite CanWrite being true before; "
+      "status: INTERNAL: error");
+}
+
+TEST_F(OutgoingSubgroupStreamTest, OnCanWriteSetsAlarm) {
+  PublishedObject obj0 = DefaultObject();
+  obj0.fin_after_this = true;
+  EXPECT_CALL(mock_stream_, CanWrite())
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0))
+      .WillOnce(Return(std::move(obj0)));
+  EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true));
+  EXPECT_CALL(visitor_, delivery_timeout())
+      .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1)));
+  EXPECT_CALL(visitor_, alternate_delivery_timeout())
+      .WillRepeatedly(Return(false));
+  EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_));
+
+  EXPECT_CALL(*track_publisher_, extensions())
+      .WillRepeatedly(ReturnRef(track_extensions_));
+  EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0)));
+  ExpectAlarm();
+  stream_->OnCanWrite();
+}
+
 TEST_F(OutgoingSubgroupStreamTest, Fin) {
   // Replace stream_ with one where next_object_ is 1.
   EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 6ff7172..5264255 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -180,8 +180,9 @@
   static void UpdateSubscriberPriority(MoqtSession* session,
                                        uint64_t subscribe_id,
                                        MoqtPriority priority) {
-    session->published_subscriptions_[subscribe_id]->set_subscriber_priority(
-        priority);
+    MessageParameters parameters;
+    parameters.subscriber_priority = priority;
+    session->published_subscriptions_[subscribe_id]->Update(parameters);
   }
 
   static SubscribeRemoteTrack* remote_track(MoqtSession* session,