Refactor: Move OutgoingDataStream to a separate file and make the interfaces with PublishedSubscription explicit. This shortens moqt_session.h and is part of separating out subscription state into something common to SUBSCRIBE and PUBLISH. Is mostly a no-op for tests. PiperOrigin-RevId: 915657635
diff --git a/build/source_list.bzl b/build/source_list.bzl index b7ca694..8a79fac 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1608,6 +1608,7 @@ "quic/moqt/moqt_trace_recorder.h", "quic/moqt/moqt_track.h", "quic/moqt/moqt_types.h", + "quic/moqt/moqt_uni_stream.h", "quic/moqt/relay_namespace_tree.h", "quic/moqt/session_namespace_tree.h", "quic/moqt/tools/chat_client.h", @@ -1639,6 +1640,7 @@ "quic/moqt/moqt_stream_map.cc", "quic/moqt/moqt_trace_recorder.cc", "quic/moqt/moqt_track.cc", + "quic/moqt/moqt_uni_stream.cc", "quic/moqt/relay_namespace_tree.cc", "quic/moqt/tools/chat_client.cc", "quic/moqt/tools/moq_chat.cc", @@ -1669,6 +1671,7 @@ "quic/moqt/moqt_session_test.cc", "quic/moqt/moqt_stream_map_test.cc", "quic/moqt/moqt_track_test.cc", + "quic/moqt/moqt_uni_stream_test.cc", "quic/moqt/relay_namespace_tree_test.cc", "quic/moqt/session_namespace_tree_test.cc", "quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 2de8cbf..7529969 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1612,6 +1612,7 @@ "src/quiche/quic/moqt/moqt_trace_recorder.h", "src/quiche/quic/moqt/moqt_track.h", "src/quiche/quic/moqt/moqt_types.h", + "src/quiche/quic/moqt/moqt_uni_stream.h", "src/quiche/quic/moqt/relay_namespace_tree.h", "src/quiche/quic/moqt/session_namespace_tree.h", "src/quiche/quic/moqt/tools/chat_client.h", @@ -1643,6 +1644,7 @@ "src/quiche/quic/moqt/moqt_stream_map.cc", "src/quiche/quic/moqt/moqt_trace_recorder.cc", "src/quiche/quic/moqt/moqt_track.cc", + "src/quiche/quic/moqt/moqt_uni_stream.cc", "src/quiche/quic/moqt/relay_namespace_tree.cc", "src/quiche/quic/moqt/tools/chat_client.cc", "src/quiche/quic/moqt/tools/moq_chat.cc", @@ -1674,6 +1676,7 @@ "src/quiche/quic/moqt/moqt_session_test.cc", "src/quiche/quic/moqt/moqt_stream_map_test.cc", "src/quiche/quic/moqt/moqt_track_test.cc", + "src/quiche/quic/moqt/moqt_uni_stream_test.cc", "src/quiche/quic/moqt/relay_namespace_tree_test.cc", "src/quiche/quic/moqt/session_namespace_tree_test.cc", "src/quiche/quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/build/source_list.json b/build/source_list.json index e159716..4a17ef1 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1611,6 +1611,7 @@ "quiche/quic/moqt/moqt_trace_recorder.h", "quiche/quic/moqt/moqt_track.h", "quiche/quic/moqt/moqt_types.h", + "quiche/quic/moqt/moqt_uni_stream.h", "quiche/quic/moqt/relay_namespace_tree.h", "quiche/quic/moqt/session_namespace_tree.h", "quiche/quic/moqt/tools/chat_client.h", @@ -1642,6 +1643,7 @@ "quiche/quic/moqt/moqt_stream_map.cc", "quiche/quic/moqt/moqt_trace_recorder.cc", "quiche/quic/moqt/moqt_track.cc", + "quiche/quic/moqt/moqt_uni_stream.cc", "quiche/quic/moqt/relay_namespace_tree.cc", "quiche/quic/moqt/tools/chat_client.cc", "quiche/quic/moqt/tools/moq_chat.cc", @@ -1673,6 +1675,7 @@ "quiche/quic/moqt/moqt_session_test.cc", "quiche/quic/moqt/moqt_stream_map_test.cc", "quiche/quic/moqt/moqt_track_test.cc", + "quiche/quic/moqt/moqt_uni_stream_test.cc", "quiche/quic/moqt/relay_namespace_tree_test.cc", "quiche/quic/moqt/session_namespace_tree_test.cc", "quiche/quic/moqt/test_tools/moqt_simulator_test.cc",
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h index a763934..04561d5 100644 --- a/quiche/quic/moqt/moqt_priority.h +++ b/quiche/quic/moqt/moqt_priority.h
@@ -12,6 +12,11 @@ namespace moqt { +// WebTransport lets applications split a session into multiple send groups +// that have equal weight for scheduling. We don't have a use for that, so the +// send group is always the same. +constexpr webtransport::SendGroupId kMoqtSendGroupId = 0; + // Priority that can be assigned to a track or individual streams associated // with the track by either the publisher or the subscriber. using MoqtPriority = uint8_t;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 35d6555..87e6796 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -30,7 +30,6 @@ #include "quiche/quic/core/quic_alarm_factory.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" -#include "quiche/quic/core/quic_utils.h" #include "quiche/quic/moqt/moqt_bidi_stream.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_fetch_task.h" @@ -48,6 +47,7 @@ #include "quiche/quic/moqt/moqt_stream_map.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/moqt_types.h" +#include "quiche/quic/moqt/moqt_uni_stream.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -68,11 +68,6 @@ using ::quic::Perspective; -// WebTransport lets applications split a session into multiple send groups -// that have equal weight for scheduling. We don't have a use for that, so the -// send group is always the same. -constexpr webtransport::SendGroupId kMoqtSendGroupId = 0; - class DefaultPublisher : public MoqtPublisher { public: static DefaultPublisher* GetInstance() { @@ -779,7 +774,7 @@ } PublishedSubscription& subscription = *it->second; if (!session_->CanOpenNextOutgoingUnidirectionalStream()) { - subscription.AddQueuedOutgoingDataStream(parameters); + subscription.AddQueuedOutgoingSubgroupStream(parameters); // The subscription will notify the session about how to update the // session's queue. // TODO: limit the number of streams in the queue. @@ -798,8 +793,17 @@ << "OpenDataStream called when creation of new streams is blocked."; return nullptr; } - new_stream->SetVisitor(std::make_unique<OutgoingDataStream>( - this, new_stream, subscription, parameters)); + webtransport::StreamPriority priority{ + kMoqtSendGroupId, + subscription.GetSendOrder( + Location(parameters.index.group, parameters.first_object), + parameters.index.subgroup, + parameters.publisher_priority.value_or( + subscription.default_publisher_priority()))}; + new_stream->SetVisitor(std::make_unique<OutgoingSubgroupStream>( + framer_, new_stream, parameters.index, parameters.first_object, + subscription.GetWeakPtr(), subscription.publisher_shared_ptr(), priority, + subscription.track_alias(), &trace_recorder_)); subscription.OnDataStreamCreated(new_stream->GetStreamId(), parameters.index); return new_stream; } @@ -871,7 +875,7 @@ // Pop the item from the subscription's queue, which might update // subscribes_with_queued_outgoing_data_streams_. NewStreamParameters next_queued_stream = - subscription->second->NextQueuedOutgoingDataStream(); + subscription->second->NextQueuedOutgoingSubgroupStream(); // Check if Group is too old. if (next_queued_stream.index.group < subscription->second->first_active_group()) { @@ -1933,7 +1937,8 @@ can_have_joining_fetch_(subscribe.parameters.forward()), track_alias_(track_alias), parameters_(subscribe.parameters), - monitoring_interface_(monitoring_interface) { + monitoring_interface_(monitoring_interface), + weak_ptr_factory_(this) { if (monitoring_interface_ != nullptr) { monitoring_interface_->OnObjectAckSupportKnown( subscribe.parameters.oack_window_size); @@ -2084,8 +2089,8 @@ if (raw_stream == nullptr) { continue; } - OutgoingDataStream* stream = - absl::down_cast<OutgoingDataStream*>(raw_stream->visitor()); + OutgoingSubgroupStream* stream = + absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); stream->CreateAndSetAlarm(session_->callbacks_.clock->ApproximateNow() + delivery_timeout()); } @@ -2111,10 +2116,9 @@ if (raw_stream == nullptr) { return; } - - OutgoingDataStream* stream = - absl::down_cast<OutgoingDataStream*>(raw_stream->visitor()); - stream->SendObjects(*this); + OutgoingSubgroupStream* stream = + absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); + stream->SendObjects(); } void MoqtSession::PublishedSubscription::OnTrackPublisherGone() { @@ -2144,8 +2148,8 @@ if (raw_stream == nullptr) { return; } - OutgoingDataStream* stream = - absl::down_cast<OutgoingDataStream*>(raw_stream->visitor()); + OutgoingSubgroupStream* stream = + absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); stream->Fin(location); } @@ -2201,8 +2205,8 @@ continue; } raw_stream->ResetWithUserCode(kResetCodeDeliveryTimeout); - // Sending the Reset will call the destructor for OutgoingDataStream, which - // will erase it from the SendStreamMap. + // Sending the Reset will call the destructor for OutgoingSubgroupStream, + // which will erase it from the SendStreamMap. } first_active_group_ = std::max(first_active_group_, group_id + 1); absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) { @@ -2238,7 +2242,7 @@ } // Returns the highest send order in the subscription. -void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream( +void MoqtSession::PublishedSubscription::AddQueuedOutgoingSubgroupStream( const NewStreamParameters& parameters) { std::optional<webtransport::SendOrder> start_send_order = queued_outgoing_data_streams_.empty() @@ -2261,11 +2265,11 @@ } MoqtSession::NewStreamParameters -MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() { +MoqtSession::PublishedSubscription::NextQueuedOutgoingSubgroupStream() { QUICHE_DCHECK(!queued_outgoing_data_streams_.empty()); if (queued_outgoing_data_streams_.empty()) { - QUICHE_BUG(NextQueuedOutgoingDataStream_no_stream) - << "NextQueuedOutgoingDataStream called when there are no streams " + QUICHE_BUG(NextQueuedOutgoingSubgroupStream_no_stream) + << "NextQueuedOutgoingSubgroupStream called when there are no streams " "pending."; return NewStreamParameters(0, 0, 0, 0); } @@ -2294,7 +2298,7 @@ stream_map().AddStream(start_sequence, id); } void MoqtSession::PublishedSubscription::OnDataStreamDestroyed( - webtransport::StreamId id, DataStreamIndex end_sequence) { + DataStreamIndex end_sequence) { stream_map().RemoveStream(end_sequence); } @@ -2307,200 +2311,6 @@ // TODO: send PUBLISH_DONE if the subscription is done. } -MoqtSession::OutgoingDataStream::OutgoingDataStream( - MoqtSession* session, webtransport::Stream* stream, - PublishedSubscription& subscription, const NewStreamParameters& parameters) - : session_(session), - stream_(stream), - subscription_id_(subscription.request_id()), - index_(parameters.index), - publisher_priority_(parameters.publisher_priority.value_or( - subscription.default_publisher_priority())), - // Always include extension header length, because it's difficult to know - // a priori if they're going to appear on a stream. - stream_type_(MoqtDataStreamType::Subgroup( - index_.subgroup, parameters.first_object, false, - !parameters.publisher_priority.has_value())), - next_object_(parameters.first_object), - session_liveness_(session->liveness_token_) { - UpdateSendOrder(subscription); - session->trace_recorder_.RecordSubgroupStreamCreated( - stream->GetStreamId(), subscription.track_alias(), parameters.index); -} - -MoqtSession::OutgoingDataStream::~OutgoingDataStream() { - // Though it might seem intuitive that the session object has to outlive the - // connection object (and this is indeed how something like QuicSession and - // QuicStream works), this is not the true for WebTransport visitors: the - // session getting destroyed will inevitably lead to all related streams being - // destroyed, but the actual order of destruction is not guaranteed. Thus, we - // need to check if the session still exists while accessing it in a stream - // destructor. - if (session_liveness_.expired()) { - return; - } - if (delivery_timeout_alarm_ != nullptr) { - delivery_timeout_alarm_->PermanentCancel(); - } - auto it = session_->published_subscriptions_.find(subscription_id_); - if (it != session_->published_subscriptions_.end()) { - it->second->OnDataStreamDestroyed(stream_->GetStreamId(), index_); - } -} - -void MoqtSession::OutgoingDataStream::OnCanWrite() { - PublishedSubscription* subscription = GetSubscriptionIfValid(); - if (subscription == nullptr) { - return; - } - SendObjects(*subscription); -} - -void MoqtSession::OutgoingDataStream::OnStopSendingReceived( - webtransport::StreamErrorCode error_code) { - PublishedSubscription* subscription = GetSubscriptionIfValid(); - if (subscription == nullptr) { - return; - } - subscription->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code); -} - -void MoqtSession::OutgoingDataStream::DeliveryTimeoutDelegate::OnAlarm() { - auto it = stream_->session_->published_subscriptions_.find( - stream_->subscription_id_); - if (it != stream_->session_->published_subscriptions_.end()) { - it->second->OnStreamTimeout(stream_->index()); - } - stream_->stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); -} - -MoqtSession::PublishedSubscription* -MoqtSession::OutgoingDataStream::GetSubscriptionIfValid() { - auto it = session_->published_subscriptions_.find(subscription_id_); - if (it == session_->published_subscriptions_.end()) { - stream_->ResetWithUserCode(kResetCodeCancelled); - return nullptr; - } - - PublishedSubscription* subscription = it->second.get(); - if (!subscription->publisher().largest_location().has_value()) { - QUICHE_BUG(GetSubscriptionIfValid_InvalidTrackStatusOk) - << "The track publisher returned a status indicating that no objects " - "are available, but a stream for those objects exists."; - session_->Error(MoqtError::kInternalError, - "Invalid track state provided by application"); - return nullptr; - } - return subscription; -} - -void MoqtSession::OutgoingDataStream::SendObjects( - PublishedSubscription& subscription) { - while (stream_->CanWrite()) { - std::optional<PublishedObject> object = - subscription.publisher().GetCachedObject( - index_.group, index_.subgroup, next_object_, already_delivered_); - if (!object.has_value()) { - break; - } - if (object->metadata.payload_length > 0 && object->payload.empty()) { - QUICHE_BUG(OutgoingDataStream_empty_payload) - << "Received non-empty object with no payload"; - return; - } - - QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group); - QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup); - if (!subscription.InWindow(object->metadata.location)) { - // It is possible that the next object became irrelevant due to a - // REQUEST_UPDATE. Close the stream if so. - bool success = stream_->SendFin(); - QUICHE_BUG_IF(OutgoingDataStream_fin_due_to_update, !success) - << "Writing FIN failed despite CanWrite() being true."; - return; - } - - quic::QuicTimeDelta delivery_timeout = subscription.delivery_timeout(); - if (!session_->alternate_delivery_timeout_ && - session_->callbacks_.clock->ApproximateNow() - - object->metadata.arrival_time > - delivery_timeout) { - subscription.OnStreamTimeout(index_); - stream_->ResetWithUserCode(kResetCodeDeliveryTimeout); - return; - } - uint64_t start_offset = already_delivered_; - already_delivered_ += - quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload)); - bool fin_after_this = object->fin_after_this && - already_delivered_ == object->metadata.payload_length; - if (start_offset > 0) { // Just send payload. - if (already_delivered_ == start_offset) { - // Partial delivery of an object but the payload is empty. This would - // result in an infinite loop. - QUICHE_BUG(OutgoingDataStream_empty_payload) - << "Empty payload for partial object " << object->metadata.location; - return; - } - webtransport::StreamWriteOptions options; - options.set_send_fin(fin_after_this); - absl::Status write_status = - stream_->Writev(absl::MakeSpan(object->payload), options); - if (!write_status.ok()) { - QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) - << "Writing into MoQT stream failed despite CanWrite() being true " - "before; status: " - << write_status; - session_->Error(MoqtError::kInternalError, "Data stream write error"); - return; - } - } else { - if (!session_->WriteObjectToStream( - stream_, subscription.track_alias(), object->metadata, - std::move(object->payload), stream_type_, last_object_, - fin_after_this)) { - // WriteObjectToStream() closes the connection on error, meaning that - // there is no need to process the stream any further. - return; - } - last_object_ = object->metadata; - subscription.OnObjectSent(object->metadata.location); - next_object_ = last_object_->location.object; - } - if (already_delivered_ == last_object_->payload_length) { - ++next_object_; - already_delivered_ = 0; - } else { - return; - } - if (object->fin_after_this && !delivery_timeout.IsInfinite() && - !session_->alternate_delivery_timeout_) { - CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout); - } - } -} - -void MoqtSession::OutgoingDataStream::Fin(Location last_object) { - QUICHE_DCHECK_EQ(last_object.group, index_.group); - if (next_object_ <= last_object.object) { - // There is still data to send, do nothing. - return; - } - // All data has already been sent; send a pure FIN. - bool success = stream_->SendFin(); - QUICHE_BUG_IF(OutgoingDataStream_fin_failed, !success) - << "Writing pure FIN failed."; - auto it = session_->published_subscriptions_.find(subscription_id_); - if (it == session_->published_subscriptions_.end()) { - return; - } - quic::QuicTimeDelta delivery_timeout = it->second->delivery_timeout(); - if (!delivery_timeout.IsInfinite()) { - CreateAndSetAlarm(session_->callbacks_.clock->ApproximateNow() + - delivery_timeout); - } -} - bool MoqtSession::WriteObjectToStream( webtransport::Stream* stream, uint64_t id, const PublishedObjectMetadata& metadata, @@ -2635,24 +2445,6 @@ OnObjectSent(object->metadata.location); } -void MoqtSession::OutgoingDataStream::UpdateSendOrder( - PublishedSubscription& subscription) { - stream_->SetPriority(webtransport::StreamPriority{ - /*send_group_id=*/kMoqtSendGroupId, - subscription.GetSendOrder(Location(index_.group, next_object_), - index_.subgroup, publisher_priority_)}); -} - -void MoqtSession::OutgoingDataStream::CreateAndSetAlarm( - quic::QuicTime deadline) { - if (delivery_timeout_alarm_ != nullptr) { - return; - } - delivery_timeout_alarm_ = absl::WrapUnique( - session_->alarm_factory_->CreateAlarm(new DeliveryTimeoutDelegate(this))); - delivery_timeout_alarm_->Set(deadline); -} - MoqtSession::PublishedFetch::FetchStreamVisitor::FetchStreamVisitor( std::shared_ptr<PublishedFetch> fetch, webtransport::Stream* stream) : fetch_(fetch), stream_(stream) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index cb00379..f927a8b 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -41,6 +41,7 @@ #include "quiche/quic/moqt/moqt_trace_recorder.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/moqt_types.h" +#include "quiche/quic/moqt/moqt_uni_stream.h" #include "quiche/quic/moqt/session_namespace_tree.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -367,7 +368,8 @@ }; // Represents a record for a single subscription to a local track that is // being sent to the peer. - class PublishedSubscription : public MoqtObjectListener { + class PublishedSubscription : public MoqtObjectListener, + public SubscriptionPublisherInterface { public: PublishedSubscription(MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher, @@ -383,6 +385,9 @@ uint64_t request_id() const { return request_id_; } MoqtTrackPublisher& publisher() { return *track_publisher_; } + std::shared_ptr<MoqtTrackPublisher> publisher_shared_ptr() { + return track_publisher_; + } uint64_t track_alias() const { return track_alias_; } MessageParameters& parameters() { return parameters_; } std::optional<Location> largest_sent() const { return largest_sent_; } @@ -397,21 +402,47 @@ MoqtPriority publisher_priority) override; void OnTrackPublisherGone() override; void OnNewFinAvailable(Location location, uint64_t subgroup) override; + // also a part of SubscriptionPublisherInterface. void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup, webtransport::StreamErrorCode error_code) override; void OnGroupAbandoned(uint64_t group_id) override; void ProcessObjectAck(const MoqtObjectAck& message); - // Updates the window and other properties of the subscription in question. - void Update(const MessageParameters& parameters); - // Checks if a given Location or Group should be forwarded to the - // subscriber. - bool InWindow(Location location) { + // SubscriptionPublisherInterface implementation. + bool InWindow(Location location) override { return parameters_.forward() && (!parameters_.subscription_filter.has_value() || (parameters_.subscription_filter->WindowKnown() && parameters_.subscription_filter->InWindow(location))); + }; + bool alternate_delivery_timeout() override { + return session_->alternate_delivery_timeout_; } + const quic::QuicClock* clock() override { + return session_->callbacks_.clock; + } + quic::QuicTimeDelta delivery_timeout() override { + return std::min( + parameters_.delivery_timeout.value_or(kDefaultDeliveryTimeout), + publisher_delivery_timeout_.value_or(kDefaultDeliveryTimeout)); + } + quic::QuicAlarmFactory* alarm_factory() override { + return session_->alarm_factory_.get(); + } + void OnObjectSent(Location sequence) override; + void OnStreamTimeout(DataStreamIndex index) override { + reset_subgroups_.insert(index); + if (session_->alternate_delivery_timeout_) { + first_active_group_ = std::max(first_active_group_, index.group + 1); + } + } + // OnSubgroupAbandoned() is declared above with MoqtObjectListener. + void OnDataStreamDestroyed(DataStreamIndex) override; + + // Updates the window and other properties of the subscription in question. + void Update(const MessageParameters& parameters); + // Checks if a given Location or Group should be forwarded to the + // subscriber. bool InWindow(uint64_t group) { return parameters_.forward() && (!parameters_.subscription_filter.has_value() || @@ -421,9 +452,6 @@ void OnDataStreamCreated(webtransport::StreamId id, DataStreamIndex start_sequence); - void OnDataStreamDestroyed(webtransport::StreamId id, - DataStreamIndex end_sequence); - void OnObjectSent(Location sequence); std::vector<webtransport::StreamId> GetAllStreams() const; @@ -432,18 +460,13 @@ std::optional<uint64_t> subgroup, MoqtPriority publisher_priority) const; - void AddQueuedOutgoingDataStream(const NewStreamParameters& parameters); + void AddQueuedOutgoingSubgroupStream(const NewStreamParameters& parameters); // Pops the pending outgoing data stream, with the highest send order. // The session keeps track of which subscribes have pending streams. This // function will trigger a QUICHE_DCHECK if called when there are no pending // streams. - NewStreamParameters NextQueuedOutgoingDataStream(); + NewStreamParameters NextQueuedOutgoingSubgroupStream(); - quic::QuicTimeDelta delivery_timeout() const { - return std::min( - parameters_.delivery_timeout.value_or(kDefaultDeliveryTimeout), - publisher_delivery_timeout_.value_or(kDefaultDeliveryTimeout)); - } void set_subscriber_delivery_timeout(quic::QuicTimeDelta timeout) { parameters_.delivery_timeout = timeout; } @@ -451,13 +474,6 @@ publisher_delivery_timeout_ = timeout; } - void OnStreamTimeout(DataStreamIndex index) { - reset_subgroups_.insert(index); - if (session_->alternate_delivery_timeout_) { - first_active_group_ = std::max(first_active_group_, index.group + 1); - } - } - uint64_t first_active_group() const { return first_active_group_; } absl::flat_hash_set<DataStreamIndex>& reset_subgroups() { @@ -474,6 +490,10 @@ bool established() const { return established_; } + quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() { + return weak_ptr_factory_.Create(); + } + private: friend class test::MoqtSessionPeer; SendStreamMap& stream_map(); @@ -521,82 +541,9 @@ // FinalizeSendOrder() whenever delivering it to the MoqtSession. absl::btree_multimap<webtransport::SendOrder, NewStreamParameters> queued_outgoing_data_streams_; - }; - class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor { - public: - OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream, - PublishedSubscription& subscription, - const NewStreamParameters& parameters); - ~OutgoingDataStream(); - - // webtransport::StreamVisitor implementation. - void OnCanRead() override {} - void OnCanWrite() override; - void OnResetStreamReceived(webtransport::StreamErrorCode) override {} - void OnStopSendingReceived( - webtransport::StreamErrorCode error_code) override; - void OnWriteSideInDataRecvdState() override {} - - class DeliveryTimeoutDelegate - : public quic::QuicAlarm::DelegateWithoutContext { - public: - explicit DeliveryTimeoutDelegate(OutgoingDataStream* stream) - : stream_(stream) {} - void OnAlarm() override; - - private: - OutgoingDataStream* stream_; - }; - - webtransport::Stream* stream() const { return stream_; } - - // Sends objects on the stream, starting with `next_object_`, until the - // stream becomes write-blocked or closed. - void SendObjects(PublishedSubscription& subscription); - - // Sends a pure FIN on the stream, if the last object sent matches - // |last_object|. Otherwise, does nothing. - void Fin(Location last_object); - - // Recomputes the send order and updates it for the associated stream. - void UpdateSendOrder(PublishedSubscription& subscription); - - // Creates and sets an alarm for the given deadline. Does nothing if the - // alarm is already created. - void CreateAndSetAlarm(quic::QuicTime deadline); - - DataStreamIndex index() const { return index_; } - - private: - friend class test::MoqtSessionPeer; - friend class DeliveryTimeoutDelegate; - - // Checks whether the associated subscription is still valid; if not, resets - // the stream and returns nullptr. - PublishedSubscription* GetSubscriptionIfValid(); - - MoqtSession* session_; - webtransport::Stream* stream_; - uint64_t subscription_id_; - DataStreamIndex index_; - const MoqtPriority publisher_priority_; - MoqtDataStreamType stream_type_; - // Minimum object ID that should go out next. The session doesn't know the - // exact ID of the next object in the stream because the next object could - // be in a different subgroup or simply be skipped. - uint64_t next_object_; - // Number of payload bytes from next_object_ that has already been written - // to the stream. - uint64_t already_delivered_ = 0; - // Used in subgroup streams to compute the object ID diff. If nullopt, the - // stream header has not been written yet. - std::optional<PublishedObjectMetadata> last_object_; - // If this data stream is for SUBSCRIBE, reset it if an object has been - // excessively delayed per Section 7.1.1.2. - std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_; - // A weak pointer to an object owned by the session. Used to make sure the - // session does not get called after being destroyed. - std::weak_ptr<void> session_liveness_; + // Must be last. + quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface> + weak_ptr_factory_; }; class QUICHE_EXPORT PublishedFetch { @@ -668,15 +615,22 @@ MessageParameters parameters; parameters.expires = publisher_->expiration(); parameters.largest_object = publisher_->largest_location(); - session_->GetControlStream()->SendRequestOk(request_id_, parameters); + MoqtBidiStreamBase* control_stream = session_->GetControlStream(); + if (control_stream != nullptr) { + control_stream->CheckStatus( + control_stream->SendRequestOk(request_id_, parameters)); + } session_->incoming_track_status_.erase(request_id_); // No class access below this line! } void OnSubscribeRejected(MoqtRequestErrorInfo info) override { - session_->GetControlStream()->SendRequestError( - request_id_, info.error_code, info.retry_interval, - info.reason_phrase); + MoqtBidiStreamBase* control_stream = session_->GetControlStream(); + if (control_stream != nullptr) { + control_stream->CheckStatus(control_stream->SendRequestError( + request_id_, info.error_code, info.retry_interval, + info.reason_phrase)); + } session_->incoming_track_status_.erase(request_id_); // No class access below this line! } @@ -756,6 +710,7 @@ // a session error if is not. bool ValidateRequestId(uint64_t request_id); + // TODO(martinduke): Delete once Fetch uses OutgoingSubgroupStream. // Actually sends an object on |stream| with track alias or fetch ID |id| // and metadata in |object|. Not for use with datagrams. Returns |true| if // the write was successful.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index e8f27fe..08d0a61 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -50,6 +50,7 @@ #include "quiche/common/quiche_data_reader.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" +#include "quiche/common/test_tools/quiche_test_utils.h" #include "quiche/web_transport/test_tools/in_memory_stream.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -1302,7 +1303,7 @@ control_stream->ReceiveMessage(subscribe_ok); } -TEST_F(MoqtSessionTest, CreateOutgoingDataStreamAndSend) { +TEST_F(MoqtSessionTest, CreateOutgoingSubgroupStreamAndSend) { FullTrackName ftn("foo", "bar"); auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2)); @@ -1917,6 +1918,11 @@ EXPECT_CALL(mock_stream_, visitor()).WillOnce([&] { return stream_visitor.get(); }); + webtransport::StreamPriority expected_priority{ + kMoqtSendGroupId, + SendOrderForStream(kDefaultSubscriberPriority, kDefaultPublisherPriority, + 5, 0, MoqtDeliveryOrder::kAscending)}; + EXPECT_CALL(mock_stream_, SetPriority(expected_priority)); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kOutgoingUniStreamId)); EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) @@ -2173,7 +2179,8 @@ std::make_shared<MockTrackPublisher>(request.full_track_name); TrackExtensions extensions(std::nullopt, std::nullopt, kLocalDefaultPriority, std::nullopt, std::nullopt, std::nullopt); - EXPECT_CALL(*track, extensions).WillOnce(testing::ReturnRef(extensions)); + EXPECT_CALL(*track, extensions) + .WillRepeatedly(testing::ReturnRef(extensions)); MoqtObjectListener* listener = ReceiveSubscribeSynchronousOk( track, request, control_stream.get(), /*track_alias=*/0, extensions);
diff --git a/quiche/quic/moqt/moqt_uni_stream.cc b/quiche/quic/moqt/moqt_uni_stream.cc new file mode 100644 index 0000000..e0b9c37 --- /dev/null +++ b/quiche/quic/moqt/moqt_uni_stream.cc
@@ -0,0 +1,249 @@ +// Copyright 2026 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_uni_stream.h" + +#include <cstdint> +#include <memory> +#include <optional> +#include <utility> +#include <vector> + +#include "absl/base/nullability.h" +#include "absl/memory/memory.h" +#include "absl/status/status.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_utils.h" +#include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_framer.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" +#include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_trace_recorder.h" +#include "quiche/quic/moqt/moqt_types.h" +#include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_mem_slice.h" +#include "quiche/common/quiche_weak_ptr.h" +#include "quiche/web_transport/stream_helpers.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +OutgoingSubgroupStream::OutgoingSubgroupStream( + MoqtFramer framer, webtransport::Stream* absl_nonnull stream, + DataStreamIndex index, uint64_t first_object, + quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor, + std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher, + webtransport::StreamPriority priority, uint64_t track_alias, + MoqtTraceRecorder* absl_nonnull trace_recorder) + : stream_(*stream), + index_(index), + visitor_(std::move(visitor)), + framer_(framer), + track_alias_(track_alias), + publisher_(track_publisher), + next_object_(first_object), + priority_(priority) { + stream_.SetPriority(priority_); + trace_recorder->RecordSubgroupStreamCreated(stream->GetStreamId(), + track_alias_, index); +} + +OutgoingSubgroupStream::~OutgoingSubgroupStream() { + // Though it might seem intuitive that the session object has to outlive the + // connection object (and this is indeed how something like QuicSession and + // QuicStream works), this is not the true for WebTransport visitors: the + // session getting destroyed will inevitably lead to all related streams being + // destroyed, but the actual order of destruction is not guaranteed. Thus, we + // need to check if the session still exists while accessing it in a stream + // destructor. + if (delivery_timeout_alarm_ != nullptr) { + delivery_timeout_alarm_->PermanentCancel(); + } + SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable(); + if (visitor != nullptr) { + visitor->OnDataStreamDestroyed(index_); + } +} + +void OutgoingSubgroupStream::OnCanWrite() { SendObjects(); } + +void OutgoingSubgroupStream::OnStopSendingReceived( + webtransport::StreamErrorCode error_code) { + SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable(); + if (visitor != nullptr) { + visitor->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code); + } +} + +void OutgoingSubgroupStream::DeliveryTimeoutDelegate::OnAlarm() { + SubscriptionPublisherInterface* visitor = stream_->visitor_.GetIfAvailable(); + if (visitor != nullptr) { + visitor->OnStreamTimeout(stream_->index_); + } + stream_->stream_.ResetWithUserCode(kResetCodeDeliveryTimeout); +} + +void OutgoingSubgroupStream::SendObjects() { + SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable(); + if (visitor == nullptr) { + return; + } + while (stream_.CanWrite()) { + std::optional<PublishedObject> object = publisher_->GetCachedObject( + index_.group, index_.subgroup, next_object_, already_delivered_); + if (!object.has_value()) { + break; + } + if (object->metadata.payload_length > 0 && object->payload.empty()) { + QUICHE_BUG(OutgoingSubgroupStream_empty_payload) + << "Received non-empty object with no payload"; + return; + } + QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group); + QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup); + if (!visitor->InWindow(object->metadata.location)) { + // It is possible that the next object became irrelevant due to a + // REQUEST_UPDATE. Close the stream if so. + absl::Status status = webtransport::SendFinOnStream(stream_); + QUICHE_BUG_IF(OutgoingSubgroupStream_fin_due_to_update, !status.ok()) + << "Writing FIN failed despite CanWrite() being true."; + return; + } + + quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout(); + if (!visitor->alternate_delivery_timeout() && + visitor->clock()->ApproximateNow() - object->metadata.arrival_time > + delivery_timeout) { + visitor->OnStreamTimeout(index_); + stream_.ResetWithUserCode(kResetCodeDeliveryTimeout); + // No class access below this line. + return; + } + uint64_t start_offset = already_delivered_; + already_delivered_ += + quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload)); + object->fin_after_this &= + already_delivered_ == object->metadata.payload_length; + if (start_offset > 0) { // Just send payload. + if (already_delivered_ == start_offset) { + // Partial delivery of an object but the payload is empty. This would + // result in an infinite loop. + QUICHE_BUG(OutgoingDataStream_empty_payload) + << "Empty payload for partial object " << object->metadata.location; + return; + } + webtransport::StreamWriteOptions options; + options.set_send_fin(object->fin_after_this); + absl::Status write_status = + stream_.Writev(absl::MakeSpan(object->payload), options); + if (!write_status.ok()) { + QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) + << "Writing into MoQT stream failed despite CanWrite() being true " + "before; status: " + << write_status; + stream_.ResetWithUserCode(kResetCodeInternalError); + return; + } + } else { + if (!WriteObjectToStream(*object)) { + stream_.ResetWithUserCode(kResetCodeInternalError); + // No class access below this line. + return; + } + last_object_ = object->metadata; + next_object_ = last_object_->location.object; + visitor->OnObjectSent(object->metadata.location); + } + if (already_delivered_ != last_object_->payload_length) { + return; + } + ++next_object_; + already_delivered_ = 0; + if (object->fin_after_this && !delivery_timeout.IsInfinite() && + !visitor->alternate_delivery_timeout()) { + CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout); + } + } +} + +void OutgoingSubgroupStream::Fin(Location last_object) { + QUICHE_DCHECK_EQ(last_object.group, index_.group); + if (next_object_ <= last_object.object) { + // There is still data to send, do nothing. + return; + } + // All data has already been sent; send a pure FIN. + absl::Status status = webtransport::SendFinOnStream(stream_); + QUICHE_BUG_IF(OutgoingSubgroupStream_fin_failed, !status.ok()) + << "Writing pure FIN failed."; + SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable(); + if (visitor == nullptr) { + return; + } + quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout(); + if (!delivery_timeout.IsInfinite()) { + CreateAndSetAlarm(visitor->clock()->ApproximateNow() + delivery_timeout); + } +} + +void OutgoingSubgroupStream::CreateAndSetAlarm(quic::QuicTime deadline) { + if (delivery_timeout_alarm_ != nullptr) { + return; + } + SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable(); + if (visitor == nullptr) { + return; + } + delivery_timeout_alarm_ = absl::WrapUnique( + visitor->alarm_factory()->CreateAlarm(new DeliveryTimeoutDelegate(this))); + delivery_timeout_alarm_->Set(deadline); +} + +bool OutgoingSubgroupStream::WriteObjectToStream(PublishedObject& object) { + MoqtObject header; + header.track_alias = track_alias_; + header.group_id = object.metadata.location.group; + header.subgroup_id = object.metadata.subgroup; + header.object_id = object.metadata.location.object; + header.publisher_priority = object.metadata.publisher_priority; + header.extension_headers = object.metadata.extensions; + header.object_status = object.metadata.status; + header.payload_length = object.metadata.payload_length; + + // Always include extension header length, because it's difficult to know + // a priori if they're going to appear on a stream. + if (!last_object_.has_value()) { + type_ = MoqtDataStreamType::Subgroup( + index_.subgroup, next_object_, false, + object.metadata.publisher_priority == + publisher_->extensions().default_publisher_priority()); + } + quiche::QuicheBuffer serialized_header = + framer_.SerializeObjectHeader(header, type_, last_object_); + std::vector<quiche::QuicheMemSlice> write_vector; + write_vector.reserve(object.payload.size() + 1); + write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header))); + for (auto& slice : object.payload) { + write_vector.push_back(std::move(slice)); + } + webtransport::StreamWriteOptions options; + options.set_send_fin(object.fin_after_this); + absl::Status write_status = + stream_.Writev(absl::MakeSpan(write_vector), options); + if (!write_status.ok()) { + QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed) + << "Writing into MoQT stream failed despite CanWrite being true " + "before; status: " + << write_status; + return false; + } + QUICHE_DVLOG(1) << "Stream " << stream_.GetStreamId() + << " successfully wrote " << object.metadata.location + << ", fin = " << object.fin_after_this; + return true; +} + +} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_uni_stream.h b/quiche/quic/moqt/moqt_uni_stream.h new file mode 100644 index 0000000..c8b94a0 --- /dev/null +++ b/quiche/quic/moqt/moqt_uni_stream.h
@@ -0,0 +1,139 @@ +// Copyright 2026 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_ +#define QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_ + +#include <cstdint> +#include <memory> +#include <optional> + +#include "absl/base/nullability.h" +#include "quiche/quic/core/quic_alarm.h" +#include "quiche/quic/core/quic_alarm_factory.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_framer.h" +#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_object.h" +#include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_trace_recorder.h" +#include "quiche/quic/moqt/moqt_types.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_weak_ptr.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt { + +namespace test { +class MoqtSessionPeer; +} + +// This interface provides information about the subscription. +class SubscriptionPublisherInterface { + public: + virtual ~SubscriptionPublisherInterface() = default; + virtual bool InWindow(Location) = 0; + virtual bool alternate_delivery_timeout() = 0; + virtual const quic::QuicClock* clock() = 0; + virtual quic::QuicTimeDelta delivery_timeout() = 0; + virtual quic::QuicAlarmFactory* alarm_factory() = 0; + // Called when the first byte of an object is written to the stream. + virtual void OnObjectSent(Location) = 0; + virtual void OnStreamTimeout(DataStreamIndex) = 0; + virtual void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup, + webtransport::StreamErrorCode) = 0; + virtual void OnDataStreamDestroyed(DataStreamIndex) = 0; +}; + +// This is for subscriptions only. FETCH uses its own construct. +class QUICHE_EXPORT OutgoingSubgroupStream + : public webtransport::StreamVisitor { + public: + // |visitor| is owned by the subscription, so the WeakPtr also serves as a + // liveness token. + OutgoingSubgroupStream( + MoqtFramer framer, webtransport::Stream* absl_nonnull stream, + DataStreamIndex index, uint64_t first_object, + quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor, + std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher, + webtransport::StreamPriority priority, uint64_t track_alias, + MoqtTraceRecorder* absl_nonnull trace_recorder); + ~OutgoingSubgroupStream(); + + // webtransport::StreamVisitor implementation. + void OnCanRead() override {} + void OnCanWrite() override; + void OnResetStreamReceived(webtransport::StreamErrorCode) override {} + void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override; + void OnWriteSideInDataRecvdState() override {} + + class DeliveryTimeoutDelegate + : public quic::QuicAlarm::DelegateWithoutContext { + public: + explicit DeliveryTimeoutDelegate(OutgoingSubgroupStream* stream) + : stream_(stream) {} + void OnAlarm() override; + + private: + OutgoingSubgroupStream* stream_; + }; + + // Sends objects on the stream, starting with `next_object_`, until the + // stream becomes write-blocked or closed. Can reset the stream, destroying + // the class, on a write error. + void SendObjects(); + + // Sends a pure FIN on the stream, if the last object sent matches + // |last_object|. Otherwise, does nothing. + void Fin(Location last_object); + // Reset can be called directly on the stream, with no need to involve the + // visitor. + + // Recomputes the send order and updates it for the associated stream. + void UpdatePriority(MoqtPriority subscriber_priority) { + priority_.send_order = UpdateSendOrderForSubscriberPriority( + priority_.send_order, subscriber_priority); + stream_.SetPriority(priority_); + } + + // Creates and sets an alarm for the given deadline. Does nothing if the + // alarm is already created. + void CreateAndSetAlarm(quic::QuicTime deadline); + + private: + friend class DeliveryTimeoutDelegate; + friend class test::MoqtSessionPeer; + + // Writes an object to the stream. Returns false if the write failed. The + // caller should reset the stream if that happens. + bool WriteObjectToStream(PublishedObject& object); + + webtransport::Stream& stream_; // Always valid because it owns this object. + DataStreamIndex index_; + quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor_; + MoqtFramer framer_; + MoqtDataStreamType type_; + uint64_t track_alias_; + std::shared_ptr<MoqtTrackPublisher> publisher_; + // Minimum object ID that should go out next. The session doesn't know the + // exact ID of the next object in the stream because the next object could + // be in a different subgroup or simply be skipped. + uint64_t next_object_; + // Number of payload bytes from next_object_ that has already been written + // to the stream. + uint64_t already_delivered_ = 0; + // Used in subgroup streams to compute the object ID diff and pass metadata + // for partial objects. If nullopt, the stream header has not been written + // yet. + std::optional<PublishedObjectMetadata> last_object_; + webtransport::StreamPriority priority_; + // If this data stream is for SUBSCRIBE, reset it if an object has been + // excessively delayed per Section 7.1.1.2. + std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_; +}; + +} // namespace moqt + +#endif // QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
diff --git a/quiche/quic/moqt/moqt_uni_stream_test.cc b/quiche/quic/moqt/moqt_uni_stream_test.cc new file mode 100644 index 0000000..7e83a32 --- /dev/null +++ b/quiche/quic/moqt/moqt_uni_stream_test.cc
@@ -0,0 +1,218 @@ +// Copyright 2026 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/quic/moqt/moqt_uni_stream.h" + +#include <cstdint> +#include <memory> +#include <optional> +#include <string> +#include <utility> + +#include "absl/status/status.h" +#include "absl/strings/string_view.h" +#include "absl/types/span.h" +#include "quiche/quic/core/quic_alarm_factory.h" +#include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_framer.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" +#include "quiche/quic/moqt/moqt_names.h" +#include "quiche/quic/moqt/moqt_object.h" +#include "quiche/quic/moqt/moqt_trace_recorder.h" +#include "quiche/quic/moqt/moqt_types.h" +#include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h" +#include "quiche/quic/platform/api/quic_test.h" +#include "quiche/quic/test_tools/mock_clock.h" +#include "quiche/quic/test_tools/quic_test_utils.h" +#include "quiche/common/quiche_mem_slice.h" +#include "quiche/common/quiche_weak_ptr.h" +#include "quiche/web_transport/test_tools/mock_web_transport.h" +#include "quiche/web_transport/web_transport.h" + +namespace moqt::test { + +namespace { + +using ::testing::Optional; +using ::testing::Return; +using ::testing::ReturnRef; +using ::testing::StrictMock; + +PublishedObject DefaultObject() { + PublishedObject object; + object.metadata.location = Location(0, 0); + object.metadata.subgroup = 0; + object.metadata.status = MoqtObjectStatus::kNormal; + object.metadata.arrival_time = quic::QuicTime::Zero(); + object.metadata.payload_length = 7; + object.payload.push_back(quiche::QuicheMemSlice::Copy("payload")); + object.fin_after_this = false; + return object; +} + +class MockSubscriptionPublisherInterface + : public SubscriptionPublisherInterface { + public: + MockSubscriptionPublisherInterface() : weak_ptr_factory_(this) {} + + MOCK_METHOD(bool, InWindow, (Location), (override)); + MOCK_METHOD(bool, alternate_delivery_timeout, (), (override)); + MOCK_METHOD(quic::QuicClock*, clock, (), (override)); + MOCK_METHOD(quic::QuicTimeDelta, delivery_timeout, (), (override)); + MOCK_METHOD(quic::QuicAlarmFactory*, alarm_factory, (), (override)); + MOCK_METHOD(void, OnObjectSent, (Location), (override)); + MOCK_METHOD(void, OnStreamTimeout, (DataStreamIndex), (override)); + MOCK_METHOD(void, OnSubgroupAbandoned, + (uint64_t, uint64_t, webtransport::StreamErrorCode), (override)); + MOCK_METHOD(void, OnDataStreamDestroyed, (DataStreamIndex), (override)); + + quiche::QuicheWeakPtr<SubscriptionPublisherInterface> GetWeakPtr() { + return weak_ptr_factory_.Create(); + } + + private: + quiche::QuicheWeakPtrFactory<SubscriptionPublisherInterface> + weak_ptr_factory_; +}; + +class OutgoingSubgroupStreamTest : public quic::test::QuicTest { + public: + OutgoingSubgroupStreamTest() + : index_(0, 0), + track_publisher_(std::make_shared<StrictMock<MockTrackPublisher>>( + FullTrackName("foo", "bar"))), + trace_recorder_(nullptr) { + EXPECT_CALL(mock_stream_, GetStreamId()).WillRepeatedly(Return(14)); + CreateStream(); + } + ~OutgoingSubgroupStreamTest() override { + EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_)); + } + + void CreateStream(uint64_t next_object = 0) { + EXPECT_CALL(mock_stream_, SetPriority); + stream_ = std::make_unique<OutgoingSubgroupStream>( + framer_, &mock_stream_, index_, next_object, visitor_.GetWeakPtr(), + track_publisher_, webtransport::StreamPriority(), 0, &trace_recorder_); + } + + void ExpectFin() { + EXPECT_CALL(mock_stream_, Writev) + .WillOnce([](absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_TRUE(data.empty()); + EXPECT_TRUE(options.send_fin()); + return absl::OkStatus(); + }); + } + + void ExpectAlarm() { + EXPECT_CALL(visitor_, alarm_factory()).WillOnce(Return(&alarm_factory_)); + } + + MoqtFramer framer_{true}; + StrictMock<webtransport::test::MockStream> mock_stream_; + DataStreamIndex index_; + std::shared_ptr<StrictMock<MockTrackPublisher>> track_publisher_; + StrictMock<MockSubscriptionPublisherInterface> visitor_; + MoqtTraceRecorder trace_recorder_; + TrackExtensions track_extensions_; + quic::MockClock mock_clock_; + quic::test::MockAlarmFactory alarm_factory_; + std::unique_ptr<OutgoingSubgroupStream> stream_; +}; + +TEST_F(OutgoingSubgroupStreamTest, OnCanWrite) { + EXPECT_CALL(mock_stream_, CanWrite()).WillOnce(Return(false)); + stream_->OnCanWrite(); +} + +TEST_F(OutgoingSubgroupStreamTest, OnStopSendingReceived) { + EXPECT_CALL(visitor_, OnSubgroupAbandoned(index_.group, index_.subgroup, 1)); + stream_->OnStopSendingReceived(1); +} + +TEST_F(OutgoingSubgroupStreamTest, DeliveryTimeoutAlarm) { + OutgoingSubgroupStream::DeliveryTimeoutDelegate delegate(stream_.get()); + EXPECT_CALL(visitor_, OnStreamTimeout(index_)); + EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout)); + delegate.OnAlarm(); +} + +TEST_F(OutgoingSubgroupStreamTest, Fin) { + // Replace stream_ with one where next_object_ is 1. + EXPECT_CALL(visitor_, OnDataStreamDestroyed(index_)); + CreateStream(1); + // last_object.object < next_object: sends pure FIN + ExpectFin(); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillOnce(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, clock()).WillOnce(Return(&mock_clock_)); + ExpectAlarm(); + stream_->Fin(Location(0, 0)); + // last_object.object >= next_object: does nothing + stream_->Fin(Location(0, 1)); +} + +TEST_F(OutgoingSubgroupStreamTest, UpdatePriority) { + EXPECT_CALL(mock_stream_, SetPriority(webtransport::StreamPriority{ + 0, 0x3fc0000000000000ULL})); + stream_->UpdatePriority(0); +} + +TEST_F(OutgoingSubgroupStreamTest, SendFragmentedObject) { + PublishedObject obj0 = DefaultObject(); + obj0.metadata.payload_length = 15; + obj0.payload.clear(); + obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part1")); + obj0.payload.push_back(quiche::QuicheMemSlice::Copy("part2")); + obj0.fin_after_this = true; + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 0)) + .WillOnce(Return(std::move(obj0))); + EXPECT_CALL(visitor_, InWindow).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(true)); + EXPECT_CALL(visitor_, delivery_timeout()) + .WillRepeatedly(Return(quic::QuicTimeDelta::FromSeconds(1))); + EXPECT_CALL(visitor_, alternate_delivery_timeout()) + .WillRepeatedly(Return(false)); + EXPECT_CALL(visitor_, clock()).WillRepeatedly(Return(&mock_clock_)); + EXPECT_CALL(*track_publisher_, extensions()) + .WillRepeatedly(ReturnRef(track_extensions_)); + EXPECT_CALL(mock_stream_, Writev) + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_EQ(data.size(), 3); + EXPECT_EQ(data[1].AsStringView(), "part1"); + EXPECT_EQ(data[2].AsStringView(), "part2"); + EXPECT_FALSE(options.send_fin()); + return absl::OkStatus(); + }); + EXPECT_CALL(visitor_, OnObjectSent(Location(0, 0))); + stream_->OnCanWrite(); + PublishedObject obj1 = DefaultObject(); + obj1.metadata.payload_length = 15; + obj1.payload.clear(); + obj1.payload.push_back(quiche::QuicheMemSlice::Copy("part3")); + obj1.fin_after_this = true; + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 0, 10)) + .WillOnce(Return(std::move(obj1))); + EXPECT_CALL(*track_publisher_, GetCachedObject(0, Optional(0), 1, 0)) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(mock_stream_, Writev) + .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, + const webtransport::StreamWriteOptions& options) { + EXPECT_EQ(data.size(), 1); + EXPECT_EQ(data[0].AsStringView(), "part3"); + EXPECT_TRUE(options.send_fin()); + return absl::OkStatus(); + }); + EXPECT_CALL(visitor_, OnObjectSent).Times(0); + ExpectAlarm(); + stream_->OnCanWrite(); +} + +} // namespace + +} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 3f3a740..6ff7172 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -31,11 +31,11 @@ #include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/moqt_types.h" +#include "quiche/quic/moqt/moqt_uni_stream.h" #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/platform/api/quiche_test.h" #include "quiche/common/quiche_data_reader.h" -#include "quiche/common/test_tools/quiche_test_utils.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" #include "quiche/web_transport/web_transport.h" @@ -258,7 +258,7 @@ } static quic::QuicAlarm* GetAlarm(webtransport::StreamVisitor* visitor) { - return absl::down_cast<MoqtSession::OutgoingDataStream*>(visitor) + return absl::down_cast<OutgoingSubgroupStream*>(visitor) ->delivery_timeout_alarm_.get(); }