Permanently cancel subgroups if the stream has STOP_SENDING.

Today, the publisher will keep trying to open the stream.

PiperOrigin-RevId: 900931392
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 69020ba..2424e34 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -2121,6 +2121,7 @@
     // This subgroup has already been reset, ignore.
     return;
   }
+  reset_subgroups_.insert(index);
   QUICHE_DCHECK_GE(group, first_active_group_);
   std::optional<webtransport::StreamId> stream_id =
       stream_map().GetStreamFor(index);
@@ -2313,6 +2314,15 @@
   SendObjects(*subscription);
 }
 
+void MoqtSession::OutgoingDataStream::OnStopSendingReceived(
+    webtransport::StreamErrorCode error_code) {
+  PublishedSubscription* subscription = GetSubscriptionIfValid();
+  if (subscription == nullptr) {
+    return;
+  }
+  subscription->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code);
+}
+
 void MoqtSession::OutgoingDataStream::DeliveryTimeoutDelegate::OnAlarm() {
   auto it = stream_->session_->published_subscriptions_.find(
       stream_->subscription_id_);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 70a00a7..2bdd6ec 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -526,10 +526,9 @@
     // webtransport::StreamVisitor implementation.
     void OnCanRead() override {}
     void OnCanWrite() override;
-    void OnResetStreamReceived(
-        webtransport::StreamErrorCode /*error*/) override {}
+    void OnResetStreamReceived(webtransport::StreamErrorCode) override {}
     void OnStopSendingReceived(
-        webtransport::StreamErrorCode /*error*/) override {}
+        webtransport::StreamErrorCode error_code) override;
     void OnWriteSideInDataRecvdState() override {}
 
     class DeliveryTimeoutDelegate
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 48d3260..7997bd1 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -4194,6 +4194,46 @@
   listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80);
 }
 
+TEST_F(MoqtSessionTest, StopSendingBlocksSubgroup) {
+  MoqtSubscribe subscribe = DefaultSubscribe();
+  MockTrackPublisher* track = CreateTrackPublisher();
+  std::unique_ptr<MoqtControlParserVisitor> control_stream =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MoqtObjectListener* listener =
+      ReceiveSubscribeSynchronousOk(track, subscribe, control_stream.get(), 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream)
+      .WillOnce(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream)
+      .WillOnce(Return(&mock_stream_));
+  std::unique_ptr<webtransport::StreamVisitor> data_stream_visitor;
+  EXPECT_CALL(mock_stream_, SetVisitor)
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        data_stream_visitor = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream_, visitor).WillRepeatedly([&]() {
+    return data_stream_visitor.get();
+  });
+  EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true));
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0))
+      .WillOnce(Return(PublishedObject{
+          PublishedObjectMetadata{Location(0, 0), 1, "",
+                                  MoqtObjectStatus::kNormal, 0x80,
+                                  MoqtSessionPeer::Now(&session_)},
+          quiche::QuicheMemSlice(), false}));
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1))
+      .WillOnce(Return(std::nullopt));
+  SetLargestId(track, Location(0, 0));
+  EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus()));
+  listener->OnNewObjectAvailable(Location(0, 0), 1, 0x80);
+
+  EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled));
+  data_stream_visitor->OnStopSendingReceived(kResetCodeCancelled);
+  // New object in the same subgroup should not be sent.
+  EXPECT_CALL(*track, GetCachedObject).Times(0);
+  EXPECT_CALL(mock_stream_, Writev).Times(0);
+  listener->OnNewObjectAvailable(Location(0, 1), 1, 0x80);
+}
+
 }  // namespace test
 
 }  // namespace moqt