Permanently cancel subgroups if the stream has STOP_SENDING. Today, the publisher will keep trying to open the stream. PiperOrigin-RevId: 900931392
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 69020ba..2424e34 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -2121,6 +2121,7 @@ // This subgroup has already been reset, ignore. return; } + reset_subgroups_.insert(index); QUICHE_DCHECK_GE(group, first_active_group_); std::optional<webtransport::StreamId> stream_id = stream_map().GetStreamFor(index); @@ -2313,6 +2314,15 @@ SendObjects(*subscription); } +void MoqtSession::OutgoingDataStream::OnStopSendingReceived( + webtransport::StreamErrorCode error_code) { + PublishedSubscription* subscription = GetSubscriptionIfValid(); + if (subscription == nullptr) { + return; + } + subscription->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code); +} + void MoqtSession::OutgoingDataStream::DeliveryTimeoutDelegate::OnAlarm() { auto it = stream_->session_->published_subscriptions_.find( stream_->subscription_id_);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 70a00a7..2bdd6ec 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -526,10 +526,9 @@ // webtransport::StreamVisitor implementation. void OnCanRead() override {} void OnCanWrite() override; - void OnResetStreamReceived( - webtransport::StreamErrorCode /*error*/) override {} + void OnResetStreamReceived(webtransport::StreamErrorCode) override {} void OnStopSendingReceived( - webtransport::StreamErrorCode /*error*/) override {} + webtransport::StreamErrorCode error_code) override; void OnWriteSideInDataRecvdState() override {} class DeliveryTimeoutDelegate
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 48d3260..7997bd1 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -4194,6 +4194,46 @@ listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80); } +TEST_F(MoqtSessionTest, StopSendingBlocksSubgroup) { + MoqtSubscribe subscribe = DefaultSubscribe(); + MockTrackPublisher* track = CreateTrackPublisher(); + std::unique_ptr<MoqtControlParserVisitor> control_stream = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MoqtObjectListener* listener = + ReceiveSubscribeSynchronousOk(track, subscribe, control_stream.get(), 0); + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream) + .WillOnce(Return(true)); + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream) + .WillOnce(Return(&mock_stream_)); + std::unique_ptr<webtransport::StreamVisitor> data_stream_visitor; + EXPECT_CALL(mock_stream_, SetVisitor) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + data_stream_visitor = std::move(visitor); + }); + EXPECT_CALL(mock_stream_, visitor).WillRepeatedly([&]() { + return data_stream_visitor.get(); + }); + EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true)); + EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(0, 0), 1, "", + MoqtObjectStatus::kNormal, 0x80, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice(), false})); + EXPECT_CALL(*track, GetCachedObject(0, Optional(1), 1)) + .WillOnce(Return(std::nullopt)); + SetLargestId(track, Location(0, 0)); + EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus())); + listener->OnNewObjectAvailable(Location(0, 0), 1, 0x80); + + EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeCancelled)); + data_stream_visitor->OnStopSendingReceived(kResetCodeCancelled); + // New object in the same subgroup should not be sent. + EXPECT_CALL(*track, GetCachedObject).Times(0); + EXPECT_CALL(mock_stream_, Writev).Times(0); + listener->OnNewObjectAvailable(Location(0, 1), 1, 0x80); +} + } // namespace test } // namespace moqt