Cleanup OutgoingSubgroupStream. Make SendObjects() private, use OnCanWrite() for public calls. Update priority of active streams when subscriber_priority changes. PiperOrigin-RevId: 915664438
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 87e6796..37103af 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -1978,27 +1978,34 @@ const MessageParameters& parameters) { // TODO(martinduke): If there are auth tokens, this probably has to go to the // application. + MoqtPriority old_priority = + parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority); parameters_.Update(parameters); can_have_joining_fetch_ = parameters_.forward(); + if (parameters.subscriber_priority.has_value()) { // priority changed. + // Reprioritize all active streams. + for (const auto stream_id : stream_map().GetAllStreams()) { + webtransport::Stream* stream = + session_->session_->GetStreamById(stream_id); + if (stream == nullptr) { + continue; + } + OutgoingSubgroupStream* outgoing_stream = + absl::down_cast<OutgoingSubgroupStream*>(stream->visitor()); + outgoing_stream->UpdatePriority( + parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority)); + } + if (queued_outgoing_data_streams_.empty()) { + return; + } + webtransport::SendOrder old_send_order = + UpdateSendOrderForSubscriberPriority( + queued_outgoing_data_streams_.rbegin()->first, old_priority); + session_->UpdateQueuedSendOrder(request_id_, old_send_order, + FinalizeSendOrder(old_send_order)); + } } -void MoqtSession::PublishedSubscription::set_subscriber_priority( - MoqtPriority priority) { - if (priority == - parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority)) { - return; - } - if (queued_outgoing_data_streams_.empty()) { - parameters_.subscriber_priority = priority; - return; - } - webtransport::SendOrder old_send_order = - FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first); - parameters_.subscriber_priority = priority; - session_->UpdateQueuedSendOrder(request_id_, old_send_order, - FinalizeSendOrder(old_send_order)); -}; - void MoqtSession::PublishedSubscription::OnSubscribeAccepted() { ControlStream* stream = session_->GetControlStream(); QUICHE_DCHECK(!established_); @@ -2116,9 +2123,7 @@ if (raw_stream == nullptr) { return; } - OutgoingSubgroupStream* stream = - absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); - stream->SendObjects(); + raw_stream->visitor()->OnCanWrite(); } void MoqtSession::PublishedSubscription::OnTrackPublisherGone() {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index f927a8b..d66df53 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -391,7 +391,6 @@ uint64_t track_alias() const { return track_alias_; } MessageParameters& parameters() { return parameters_; } std::optional<Location> largest_sent() const { return largest_sent_; } - void set_subscriber_priority(MoqtPriority priority); // MoqtObjectListener implementation. void OnSubscribeAccepted() override;
diff --git a/quiche/quic/moqt/moqt_stream_map.h b/quiche/quic/moqt/moqt_stream_map.h index dfb4ef6..f2f40a4 100644 --- a/quiche/quic/moqt/moqt_stream_map.h +++ b/quiche/quic/moqt/moqt_stream_map.h
@@ -10,7 +10,7 @@ #include <vector> #include "absl/container/btree_map.h" -#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_types.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/web_transport/web_transport.h"
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h index c8b94a0..9884b26 100644 --- a/quiche/quic/moqt/moqt_uni_stream.h +++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -80,11 +80,6 @@ OutgoingSubgroupStream* stream_; }; - // Sends objects on the stream, starting with `next_object_`, until the - // stream becomes write-blocked or closed. Can reset the stream, destroying - // the class, on a write error. - void SendObjects(); - // Sends a pure FIN on the stream, if the last object sent matches // |last_object|. Otherwise, does nothing. void Fin(Location last_object); @@ -106,6 +101,11 @@ friend class DeliveryTimeoutDelegate; friend class test::MoqtSessionPeer; + // Sends objects on the stream, starting with `next_object_`, until the + // stream becomes write-blocked or closed. Can reset the stream, destroying + // the class, on a write error. + void SendObjects(); + // Writes an object to the stream. Returns false if the write failed. The // caller should reset the stream if that happens. bool WriteObjectToStream(PublishedObject& object);
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc index 7e83a32..45f82b4 100644 --- a/quiche/quic/moqt/moqt_uni_stream_test.cc +++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -26,6 +26,7 @@ #include "quiche/quic/platform/api/quic_test.h" #include "quiche/quic/test_tools/mock_clock.h" #include "quiche/quic/test_tools/quic_test_utils.h" +#include "quiche/common/platform/api/quiche_expect_bug.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" @@ -141,6 +142,96 @@ delegate.OnAlarm(); } +TEST_F(OutgoingSubgroupStreamTest, OnCanWriteCompleteFlow) { + PublishedObject obj0 = DefaultObject(); + EXPECT_CALL(mock_stream_, CanWrite()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); + EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); + EXPECT_CALL(*track_publisher_, extensions()) + .WillRepeatedly(ReturnRef(track_extensions_)); + EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus())); + EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); + stream_->OnCanWrite(); +} + +TEST_F(OutgoingSubgroupStreamTest, OnCanWriteNotInWindow) { + PublishedObject obj0 = DefaultObject(); + + EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(false)); + ExpectFin(); + stream_->OnCanWrite(); +} + +TEST_F(OutgoingSubgroupStreamTest, OnCanWriteTimeout) { + PublishedObject obj0 = DefaultObject(); + EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); + mock_clock_.AdvanceTime(quic::QuicTimeDelta::FromSeconds(2)); + EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); + EXPECT_CALL(visitor_, OnStreamTimeout(index_)); + EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); + stream_->OnCanWrite(); +} + +TEST_F(OutgoingSubgroupStreamTest, OnCanWriteWriteError) { + PublishedObject obj0 = DefaultObject(); + EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(true)); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, alternate_delivery_timeout()).WillOnce(Return(false)); + EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_)); + EXPECT_CALL(*track_publisher_, extensions()) + .WillRepeatedly(ReturnRef(track_extensions_)); + EXPECT_CALL(mock_stream_, Writev) + .WillOnce(Return(absl::InternalError("error"))); + EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeInternalError)); + EXPECT_QUICHE_BUG( + stream_->OnCanWrite(), + "Writing into MoQT stream failed despite CanWrite being true before; " + "status: INTERNAL: error"); +} + +TEST_F(OutgoingSubgroupStreamTest, OnCanWriteSetsAlarm) { + PublishedObject obj0 = DefaultObject(); + obj0.fin_after_this = true; + EXPECT_CALL(mock_stream_, CanWrite()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow(Location(0, 0))).WillOnce(Return(true)); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, alternate_delivery_timeout()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(visitor_, clock).WillOnce(Return(&mock_clock_)); + + EXPECT_CALL(*track_publisher_, extensions()) + .WillRepeatedly(ReturnRef(track_extensions_)); + EXPECT_CALL(mock_stream_, Writev).WillOnce(Return(absl::OkStatus())); + EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); + ExpectAlarm(); + stream_->OnCanWrite(); +} + TEST_F(OutgoingSubgroupStreamTest, Fin) { // Replace stream_ with one where next_object_ is 1. EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_));
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 6ff7172..5264255 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -180,8 +180,9 @@ static void UpdateSubscriberPriority(MoqtSession* session, uint64_t subscribe_id, MoqtPriority priority) { - session->published_subscriptions_[subscribe_id]->set_subscriber_priority( - priority); + MessageParameters parameters; + parameters.subscriber_priority = priority; + session->published_subscriptions_[subscribe_id]->Update(parameters); } static SubscribeRemoteTrack* remote_track(MoqtSession* session,