| // Copyright (c) 2026 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quiche/quic/moqt/moqt_subscription.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/base/casts.h" |
| #include "absl/base/nullability.h" |
| #include "absl/container/btree_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/strings/string_view.h" |
| #include "quiche/quic/core/quic_clock.h" |
| #include "quiche/quic/moqt/moqt_bidi_stream.h" |
| #include "quiche/quic/moqt/moqt_error.h" |
| #include "quiche/quic/moqt/moqt_framer.h" |
| #include "quiche/quic/moqt/moqt_key_value_pair.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_object.h" |
| #include "quiche/quic/moqt/moqt_priority.h" |
| #include "quiche/quic/moqt/moqt_publisher.h" |
| #include "quiche/quic/moqt/moqt_stream_map.h" |
| #include "quiche/quic/moqt/moqt_trace_recorder.h" |
| #include "quiche/quic/moqt/moqt_types.h" |
| #include "quiche/quic/moqt/moqt_uni_stream.h" |
| #include "quiche/common/quiche_buffer_allocator.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt { |
| |
| SubscriptionPublisher::SubscriptionPublisher( |
| MoqtFramer framer, std::shared_ptr<MoqtTrackPublisher> track_publisher, |
| MoqtBidiStreamBase* absl_nonnull bidi_stream, uint64_t request_id, |
| uint64_t track_alias, const MessageParameters& parameters, |
| SessionToPublisherInterface* absl_nonnull visitor, |
| MoqtPublishingMonitorInterface* monitoring_interface, |
| const quic::QuicClock* absl_nonnull clock, |
| MoqtTraceRecorder& trace_recorder) |
| : track_publisher_(track_publisher), |
| bidi_stream_(bidi_stream), |
| visitor_(visitor), |
| request_id_(request_id), |
| track_alias_(track_alias), |
| framer_(framer), |
| trace_recorder_(trace_recorder), |
| parameters_(parameters), |
| monitoring_interface_(monitoring_interface), |
| clock_(clock), |
| weak_ptr_factory_(this) { |
| if (monitoring_interface_ != nullptr) { |
| monitoring_interface_->OnObjectAckSupportKnown(parameters.oack_window_size); |
| } |
| QUIC_DLOG(INFO) << "Created subscription for " |
| << track_publisher_->GetTrackName(); |
| // TODO(martinduke): Handle NEW_GROUP_REQUEST |
| } |
| |
| SubscriptionPublisher::~SubscriptionPublisher() { |
| track_publisher_->RemoveObjectListener(this); |
| // Reset all streams. |
| for (const webtransport::StreamId stream_id : stream_map_.GetAllStreams()) { |
| webtransport::Stream* stream = GetStreamById(stream_id); |
| if (stream != nullptr) { |
| stream->ResetWithUserCode(kResetCodeCancelled); |
| } |
| } |
| } |
| |
| void SubscriptionPublisher::Update(const MessageParameters& parameters) { |
| // TODO(martinduke): If there are auth tokens, this probably has to go to the |
| // application. |
| // TODO(martinduke): If the subscribe window has shrunk, close any streams |
| // that are now outside the window. Also send PUBLISH_DONE if now done. |
| MoqtPriority old_priority = |
| parameters_.subscriber_priority.value_or(kDefaultSubscriberPriority); |
| parameters_.Update(parameters); |
| if (parameters.subscriber_priority.has_value()) { // priority changed. |
| MoqtPriority new_priority = *parameters.subscriber_priority; |
| // Reprioritize all active streams. |
| for (const webtransport::StreamId stream_id : stream_map_.GetAllStreams()) { |
| webtransport::Stream* stream = GetStreamById(stream_id); |
| if (stream == nullptr) { |
| continue; |
| } |
| OutgoingSubgroupStream* outgoing_stream = |
| absl::down_cast<OutgoingSubgroupStream*>(stream->visitor()); |
| outgoing_stream->UpdatePriority(new_priority); |
| } |
| if (pending_streams_.empty()) { |
| return; |
| } |
| // Tell the session that pending stream priority has changed. |
| MoqtPriority publisher_priority = |
| pending_streams_.rbegin()->second.publisher_priority.value_or( |
| track_publisher_->extensions().default_publisher_priority()); |
| MoqtTrackPriority old_track_priority = {old_priority, publisher_priority}; |
| visitor_->UpdateTrackPriority( |
| request_id_, old_track_priority, |
| MoqtTrackPriority{new_priority, publisher_priority}); |
| // Don't bother to update all the pending stream send orders. |
| } |
| } |
| |
| void SubscriptionPublisher::OnSubscribeAccepted() { |
| QUICHE_DCHECK(!established_); |
| established_ = true; |
| parameters_.largest_object = track_publisher_->largest_location(); |
| if (parameters_.subscription_filter.has_value()) { |
| parameters_.subscription_filter->OnLargestObject( |
| parameters_.largest_object); |
| } |
| MoqtSubscribeOk subscribe_ok; |
| subscribe_ok.request_id = request_id_; |
| subscribe_ok.track_alias = track_alias_; |
| subscribe_ok.parameters.expires = track_publisher_->expiration(); |
| subscribe_ok.parameters.largest_object = parameters_.largest_object; |
| subscribe_ok.extensions = track_publisher_->extensions(); |
| if (!parameters_.group_order.has_value()) { |
| parameters_.group_order = |
| subscribe_ok.extensions.default_publisher_group_order(); |
| } |
| // TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the |
| // publisher. |
| default_publisher_priority_ = |
| subscribe_ok.extensions.default_publisher_priority(); |
| bidi_stream_->SendOrBufferMessageOrFatal( |
| framer_.SerializeSubscribeOk(subscribe_ok)); |
| // TODO(martinduke): If we buffer objects that arrived previously, the arrival |
| // of the track alias disambiguates what subscription they belong to. Send |
| // them. |
| } |
| |
| void SubscriptionPublisher::OnSubscribeRejected(MoqtRequestErrorInfo info) { |
| bidi_stream_->CheckStatus(bidi_stream_->SendRequestError(request_id_, info)); |
| visitor_->PublishIsDone(request_id_); |
| // No class access below this line! |
| } |
| |
| void SubscriptionPublisher::OnNewObjectAvailable( |
| Location location, std::optional<uint64_t> subgroup, |
| MoqtPriority publisher_priority) { |
| if (!InWindow(location)) { |
| return; |
| } |
| |
| if (monitoring_interface_ != nullptr) { |
| // Notify the monitoring interface about all newly published normal objects. |
| // Objects with other statuses are not guaranteed to be acknowledged, thus |
| // passing them into the monitoring interface can lead to confusion. |
| std::optional<PublishedObject> object = track_publisher_->GetCachedObject( |
| location.group, subgroup, location.object); |
| QUICHE_DCHECK(object.has_value()) |
| << "Object " << absl::StrCat(location) << " on track " |
| << track_publisher_->GetTrackName().ToString() |
| << " does not exist, despite OnNewObjectAvailable being called"; |
| if (object.has_value() && object->metadata.location == location && |
| object->metadata.status == MoqtObjectStatus::kNormal) { |
| monitoring_interface_->OnNewObjectEnqueued(location); |
| } |
| } |
| |
| // TODO(vasilvv): This currently sends UINT64_MAX for datagram subgroups. |
| // Maybe do something more satisfactory? |
| trace_recorder_.RecordNewObjectAvaliable( |
| track_alias_, *track_publisher_, location, subgroup.value_or(UINT64_MAX), |
| publisher_priority); |
| |
| std::optional<webtransport::StreamId> stream_id; |
| if (subgroup.has_value()) { |
| DataStreamIndex index(location.group, *subgroup); |
| if (reset_subgroups_.contains(index)) { |
| // This subgroup has already been reset, ignore. |
| return; |
| } |
| stream_id = stream_map_.GetStreamFor(index); |
| } |
| if (visitor_->alternate_delivery_timeout() && |
| !delivery_timeout().IsInfinite() && largest_sent_.has_value() && |
| location.group >= largest_sent_->group) { |
| // Start the delivery timeout timer on all previous groups. |
| for (uint64_t group = first_active_group_; group < location.group; |
| ++group) { |
| for (webtransport::StreamId stream_to_update : |
| stream_map_.GetStreamsForGroup(group)) { |
| webtransport::Stream* raw_stream = GetStreamById(stream_to_update); |
| if (raw_stream == nullptr) { |
| continue; |
| } |
| OutgoingSubgroupStream* stream = |
| absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); |
| stream->CreateAndSetAlarm(clock_->ApproximateNow() + |
| delivery_timeout()); |
| } |
| } |
| } |
| QUICHE_DCHECK_GE(location.group, first_active_group_); |
| if (!subgroup.has_value()) { |
| SendDatagram(location); |
| return; |
| } |
| |
| webtransport::Stream* raw_stream = nullptr; |
| if (stream_id.has_value()) { |
| raw_stream = GetStreamById(*stream_id); |
| if (raw_stream != nullptr) { |
| raw_stream->visitor()->OnCanWrite(); |
| } |
| return; |
| } |
| NewDataStreamParameters parameters( |
| location.group, *subgroup, location.object, |
| publisher_priority == default_publisher_priority_ |
| ? std::nullopt |
| : std::make_optional(publisher_priority)); |
| raw_stream = OpenDataStream(parameters); |
| if (raw_stream == nullptr) { |
| StreamRank rank = StreamRankFor(parameters); |
| if (pending_streams_.empty() || rank > pending_streams_.rbegin()->first) { |
| visitor_->UpdateTrackPriority( |
| request_id_, |
| /*old_priority=*/pending_streams_.empty() |
| ? std::optional<MoqtTrackPriority>() |
| : std::make_optional( |
| MoqtTrackPriority{subscriber_priority(), |
| pending_streams_.rbegin() |
| ->second.publisher_priority.value_or( |
| default_publisher_priority())}), |
| MoqtTrackPriority{subscriber_priority(), publisher_priority}); |
| } |
| pending_streams_.emplace(rank, parameters); |
| } |
| } |
| |
| void SubscriptionPublisher::OnTrackPublisherGone() { |
| PublishIsDone(request_id_, PublishDoneCode::kGoingAway, "Publisher is gone"); |
| } |
| |
| // TODO(martinduke): Revise to check if the last object has been delivered. |
| void SubscriptionPublisher::OnNewFinAvailable(Location location, |
| uint64_t subgroup) { |
| if (!InWindow(location.group)) { |
| return; |
| } |
| DataStreamIndex index(location.group, subgroup); |
| std::optional<webtransport::StreamId> stream_id = |
| stream_map_.GetStreamFor(index); |
| if (!stream_id.has_value()) { |
| return; |
| } |
| webtransport::Stream* raw_stream = GetStreamById(*stream_id); |
| if (raw_stream == nullptr) { |
| return; |
| } |
| OutgoingSubgroupStream* stream = |
| absl::down_cast<OutgoingSubgroupStream*>(raw_stream->visitor()); |
| stream->Fin(location); |
| } |
| |
| void SubscriptionPublisher::OnSubgroupAbandoned( |
| uint64_t group, uint64_t subgroup, |
| webtransport::StreamErrorCode error_code) { |
| if (!InWindow(group)) { |
| return; |
| } |
| DataStreamIndex index(group, subgroup); |
| if (reset_subgroups_.contains(index)) { |
| // This subgroup has already been reset, ignore. |
| return; |
| } |
| reset_subgroups_.insert(index); |
| QUICHE_DCHECK_GE(group, first_active_group_); |
| std::optional<webtransport::StreamId> stream_id = |
| stream_map_.GetStreamFor(index); |
| if (!stream_id.has_value()) { |
| return; |
| } |
| webtransport::Stream* raw_stream = GetStreamById(*stream_id); |
| if (raw_stream == nullptr) { |
| return; |
| } |
| raw_stream->ResetWithUserCode(error_code); |
| } |
| |
| void SubscriptionPublisher::OnGroupAbandoned(uint64_t group_id) { |
| if (!InWindow(group_id)) { |
| // The group is not in the window, ignore. |
| return; |
| } |
| std::vector<webtransport::StreamId> streams = |
| stream_map_.GetStreamsForGroup(group_id); |
| if (delivery_timeout().IsInfinite() && largest_sent_.has_value() && |
| largest_sent_->group <= group_id) { |
| PublishIsDone(request_id_, PublishDoneCode::kTooFarBehind, ""); |
| // No class access below this line! |
| return; |
| } |
| for (webtransport::StreamId stream_id : streams) { |
| webtransport::Stream* raw_stream = GetStreamById(stream_id); |
| if (raw_stream == nullptr) { |
| continue; |
| } |
| raw_stream->ResetWithUserCode(kResetCodeDeliveryTimeout); |
| // Sending the Reset will call the destructor for OutgoingSubgroupStream, |
| // which will erase it from the SendStreamMap. |
| } |
| first_active_group_ = std::max(first_active_group_, group_id + 1); |
| absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) { |
| return index.group < first_active_group_; |
| }); |
| } |
| |
| void SubscriptionPublisher::SendDatagram(Location sequence) { |
| std::optional<PublishedObject> object = track_publisher_->GetCachedObject( |
| sequence.group, std::nullopt, sequence.object); |
| if (!object.has_value()) { |
| QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache) |
| << "Got notification about an object that is not in the cache"; |
| return; |
| } |
| MoqtObject header; |
| header.track_alias = track_alias_; |
| header.group_id = object->metadata.location.group; |
| header.object_id = object->metadata.location.object; |
| header.publisher_priority = object->metadata.publisher_priority; |
| header.extension_headers = object->metadata.extensions; |
| header.object_status = object->metadata.status; |
| header.subgroup_id = std::nullopt; |
| header.payload_length = object->metadata.payload_length; |
| QUICHE_BUG_IF(SubscriptionPublisher_SendDatagram_partial_payload, |
| object->payload.size() > 1) |
| << "Datagram is split into multiple slices"; |
| quiche::QuicheBuffer datagram = framer_.SerializeObjectDatagram( |
| header, object->payload[0].AsStringView(), |
| default_publisher_priority_.value_or(kDefaultPublisherPriority)); |
| if (visitor_->session() == nullptr) { |
| return; |
| } |
| visitor_->session()->SendOrQueueDatagram(datagram.AsStringView()); |
| OnObjectSent(object->metadata.location); |
| } |
| |
| void SubscriptionPublisher::ProcessObjectAck(const MoqtObjectAck& message) { |
| trace_recorder_.RecordObjectAck(track_alias_, |
| Location(message.group_id, message.object_id), |
| message.delta_from_deadline); |
| |
| if (monitoring_interface_ == nullptr) { |
| return; |
| } |
| monitoring_interface_->OnObjectAckReceived( |
| Location(message.group_id, message.object_id), |
| message.delta_from_deadline); |
| } |
| |
| webtransport::Stream* absl_nullable SubscriptionPublisher::OpenDataStream( |
| const NewDataStreamParameters& parameters) { |
| if (visitor_->session() == nullptr || |
| !visitor_->session()->CanOpenNextOutgoingUnidirectionalStream()) { |
| return nullptr; |
| } |
| webtransport::Stream* new_stream = |
| visitor_->session()->OpenOutgoingUnidirectionalStream(); |
| if (new_stream == nullptr) { |
| return nullptr; |
| } |
| stream_map_.AddStream(parameters.index, new_stream->GetStreamId()); |
| new_stream->SetVisitor(std::make_unique<OutgoingSubgroupStream>( |
| framer_, new_stream, parameters.index, parameters.first_object, |
| weak_ptr_factory_.Create(), track_publisher_, |
| StreamPriorityFor(parameters), track_alias_, &trace_recorder_)); |
| ++streams_opened_; |
| new_stream->visitor()->OnCanWrite(); |
| return new_stream; |
| } |
| |
| void SubscriptionPublisher::PublishIsDone(uint64_t request_id, |
| PublishDoneCode code, |
| absl::string_view error_reason) { |
| MoqtPublishDone publish_done; |
| publish_done.request_id = request_id; |
| publish_done.status_code = code; |
| publish_done.stream_count = streams_opened_; |
| publish_done.error_reason = error_reason; |
| // TODO(martinduke): It is technically correct, but not good, to simply |
| // reset all the streams in order to send PUBLISH_DONE. It's better to wait |
| // until streams FIN naturally, where possible. |
| QUICHE_DLOG(INFO) << "Sending PUBLISH_DONE message for " |
| << track_publisher_->GetTrackName(); |
| bidi_stream_->SendOrBufferMessageOrFatal( |
| framer_.SerializePublishDone(publish_done)); |
| visitor_->PublishIsDone(request_id_); |
| // No class access below this line! |
| } |
| |
| void SubscriptionPublisher::OnDataStreamDestroyed( |
| DataStreamIndex end_sequence) { |
| stream_map_.RemoveStream(end_sequence); |
| } |
| |
| void SubscriptionPublisher::OnCanCreateNewUniStream() { |
| while (visitor_->session() != nullptr && |
| visitor_->session()->CanOpenNextOutgoingUnidirectionalStream()) { |
| auto it = pending_streams_.rbegin(); |
| while (it != pending_streams_.rend() && |
| (it->second.index.group < first_active_group_ || |
| reset_subgroups_.contains(it->second.index))) { |
| pending_streams_.erase(--(it.base())); |
| it = pending_streams_.rbegin(); |
| } |
| if (it == pending_streams_.rend()) { |
| return; |
| } |
| if (OpenDataStream(it->second) == nullptr) { |
| return; |
| } |
| pending_streams_.erase(--(it.base())); |
| if (!pending_streams_.empty()) { |
| visitor_->UpdateTrackPriority( |
| request_id_, std::nullopt, |
| MoqtTrackPriority{ |
| subscriber_priority(), |
| pending_streams_.rbegin()->second.publisher_priority.value_or( |
| default_publisher_priority())}); |
| } |
| } |
| } |
| |
| void SubscriptionPublisher::OnObjectSent(Location sequence) { |
| if (largest_sent_.has_value()) { |
| largest_sent_ = std::max(*largest_sent_, sequence); |
| } else { |
| largest_sent_ = sequence; |
| } |
| // TODO: send PUBLISH_DONE if the subscription is done. |
| } |
| |
| } // namespace moqt |