| // Copyright 2026 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_ |
| #define QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_ |
| |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| |
| #include "absl/base/nullability.h" |
| #include "absl/strings/string_view.h" |
| #include "quiche/quic/core/quic_alarm.h" |
| #include "quiche/quic/core/quic_alarm_factory.h" |
| #include "quiche/quic/core/quic_time.h" |
| #include "quiche/quic/moqt/moqt_error.h" |
| #include "quiche/quic/moqt/moqt_fetch_task.h" |
| #include "quiche/quic/moqt/moqt_framer.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_object.h" |
| #include "quiche/quic/moqt/moqt_parser.h" |
| #include "quiche/quic/moqt/moqt_priority.h" |
| #include "quiche/quic/moqt/moqt_publisher.h" |
| #include "quiche/quic/moqt/moqt_session_interface.h" |
| #include "quiche/quic/moqt/moqt_trace_recorder.h" |
| #include "quiche/quic/moqt/moqt_track.h" |
| #include "quiche/quic/moqt/moqt_types.h" |
| #include "quiche/common/platform/api/quiche_export.h" |
| #include "quiche/common/quiche_callbacks.h" |
| #include "quiche/common/quiche_weak_ptr.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt { |
| |
| namespace test { |
| class OutgoingSubgroupStreamPeer; |
| class MoqtSessionPeer; |
| } |
| |
| // A base class for locally initiated unidirectional streams, which can serve |
| // either a Subgroup or a FETCH response. It contains most of the machinery for |
| // managing the WebTransport stream. |
| class OutgoingUniStream : public webtransport::StreamVisitor { |
| public: |
| OutgoingUniStream(MoqtFramer framer, |
| webtransport::Stream* absl_nonnull stream, |
| webtransport::StreamPriority priority, |
| uint64_t track_identifier) |
| : stream_(*stream), |
| priority_(priority), |
| track_identifier_(track_identifier), |
| framer_(framer) { |
| stream_.SetPriority(priority_); |
| } |
| virtual ~OutgoingUniStream() = default; |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override {} // Write-only. |
| // OnCanWrite() deferred to children. |
| virtual void OnResetStreamReceived(webtransport::StreamErrorCode) override {} |
| // OnStopSendingReceived() deferred to children. |
| void OnWriteSideInDataRecvdState() override {} |
| |
| // Recomputes the send order and updates it for the associated stream. |
| void UpdatePriority(MoqtPriority subscriber_priority); |
| |
| protected: |
| webtransport::Stream& stream() { return stream_; } |
| std::optional<PublishedObjectMetadata>& last_object() { return last_object_; } |
| void set_last_object(PublishedObjectMetadata metadata) { |
| last_object_ = std::move(metadata); |
| } |
| |
| // Writes an object to the stream. Returns false if the write failed. The |
| // caller should reset the stream if that happens. |
| bool WriteObjectToStream(PublishedObject& object, MoqtDataStreamType type); |
| |
| private: |
| webtransport::Stream& stream_; // Always valid because it owns this object. |
| webtransport::StreamPriority priority_; |
| uint64_t track_identifier_; // track alias or fetch request ID. |
| |
| MoqtFramer framer_; |
| // Used to compute the object ID diff and pass metadata for partial objects. |
| // If nullopt, the stream header has not been written yet. |
| std::optional<PublishedObjectMetadata> last_object_; |
| }; |
| |
| // This interface provides information about the subscription. |
| class SubscriptionPublisherInterface { |
| public: |
| virtual ~SubscriptionPublisherInterface() = default; |
| virtual bool InWindow(Location) = 0; |
| virtual bool alternate_delivery_timeout() = 0; |
| virtual const quic::QuicClock* clock() = 0; |
| virtual quic::QuicTimeDelta delivery_timeout() = 0; |
| virtual quic::QuicAlarmFactory* alarm_factory() = 0; |
| // Called when the first byte of an object is written to the stream. |
| virtual void OnObjectSent(Location) = 0; |
| virtual void OnStreamTimeout(DataStreamIndex) = 0; |
| virtual void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup, |
| webtransport::StreamErrorCode) = 0; |
| virtual void OnDataStreamDestroyed(DataStreamIndex) = 0; |
| }; |
| |
| // This is for subscriptions only. FETCH uses its own construct. |
| class QUICHE_EXPORT OutgoingSubgroupStream : public OutgoingUniStream { |
| public: |
| // |visitor| is owned by the subscription, so the WeakPtr also serves as a |
| // liveness token. |
| OutgoingSubgroupStream( |
| MoqtFramer framer, webtransport::Stream* absl_nonnull stream, |
| DataStreamIndex index, uint64_t first_object, |
| quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor, |
| std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher, |
| webtransport::StreamPriority priority, uint64_t track_alias, |
| MoqtTraceRecorder* absl_nonnull trace_recorder); |
| ~OutgoingSubgroupStream(); |
| |
| // webtransport::StreamVisitor overrides. |
| void OnCanWrite() override; |
| void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override; |
| |
| class DeliveryTimeoutDelegate |
| : public quic::QuicAlarm::DelegateWithoutContext { |
| public: |
| explicit DeliveryTimeoutDelegate(OutgoingSubgroupStream* stream) |
| : stream_(stream) {} |
| void OnAlarm() override; |
| |
| private: |
| OutgoingSubgroupStream* stream_; |
| }; |
| |
| // Sends a pure FIN on the stream, if the last object sent matches |
| // |last_object|. Otherwise, does nothing. |
| void Fin(Location last_object); |
| // Reset can be called directly on the stream, with no need to involve the |
| // visitor. |
| |
| // Creates and sets an alarm for the given deadline. Does nothing if the |
| // alarm is already created. |
| void CreateAndSetAlarm(quic::QuicTime deadline); |
| |
| private: |
| friend class DeliveryTimeoutDelegate; |
| friend class test::OutgoingSubgroupStreamPeer; |
| |
| // Sends objects on the stream, starting with `next_object_`, until the |
| // stream becomes write-blocked or closed. Can reset the stream, destroying |
| // the class, on a write error. |
| void SendObjects(); |
| |
| DataStreamIndex index_; |
| quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor_; |
| |
| MoqtDataStreamType type_; |
| uint64_t track_alias_; |
| std::shared_ptr<MoqtTrackPublisher> publisher_; |
| // Minimum object ID that should go out next. The session doesn't know the |
| // exact ID of the next object in the stream because the next object could |
| // be in a different subgroup or simply be skipped. |
| uint64_t next_object_; |
| // Number of payload bytes from next_object_ that has already been written |
| // to the stream. |
| uint64_t already_delivered_ = 0; |
| |
| // If this data stream is for SUBSCRIBE, reset it if an object has been |
| // excessively delayed per Section 7.1.1.2. |
| std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_; |
| }; |
| |
| using FetchStreamCloseCallback = quiche::SingleUseCallback<void()>; |
| |
| class QUICHE_EXPORT OutgoingFetchStream : public OutgoingUniStream { |
| public: |
| OutgoingFetchStream(MoqtFramer framer, |
| webtransport::Stream* absl_nonnull stream, |
| uint64_t request_id, |
| webtransport::StreamPriority priority, |
| std::unique_ptr<MoqtFetchTask> incoming_objects, |
| FetchStreamCloseCallback close_callback, |
| MoqtTraceRecorder* absl_nonnull trace_recorder); |
| ~OutgoingFetchStream(); |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanWrite() override; |
| void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override; |
| |
| private: |
| std::unique_ptr<MoqtFetchTask> incoming_objects_; |
| FetchStreamCloseCallback close_callback_; |
| }; |
| |
| class SessionToUniStreamInterface { |
| public: |
| virtual ~SessionToUniStreamInterface() = default; |
| virtual bool deliver_partial_objects() const = 0; |
| virtual void OnMalformedTrack(RemoteTrack* name) = 0; |
| virtual quiche::QuicheWeakPtr<RemoteTrack> GetSubscribe( |
| uint64_t track_alias) = 0; |
| virtual quiche::QuicheWeakPtr<RemoteTrack> GetFetch(uint64_t request_id) = 0; |
| virtual void Error(MoqtError error_code, absl::string_view reason) = 0; |
| }; |
| |
| class QUICHE_EXPORT IncomingDataStream : public webtransport::StreamVisitor, |
| public MoqtDataParserVisitor { |
| public: |
| IncomingDataStream(webtransport::Stream* absl_nonnull stream, |
| SessionToUniStreamInterface* absl_nonnull session, |
| const quic::QuicClock* absl_nonnull clock) |
| : stream_(stream), |
| parser_(stream, this), |
| session_(session), |
| clock_(clock) {} |
| ~IncomingDataStream(); |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override; |
| void OnCanWrite() override {} |
| void OnResetStreamReceived(webtransport::StreamErrorCode) override {} |
| void OnStopSendingReceived(webtransport::StreamErrorCode /*error*/) override { |
| } |
| void OnWriteSideInDataRecvdState() override {} |
| |
| // MoqtParserVisitor implementation. |
| // TODO: Handle a stream FIN. |
| void OnObjectMessage(const MoqtObject& message, absl::string_view payload, |
| bool end_of_message) override; |
| void OnFin() override { fin_received_ = true; } |
| void OnParsingError(MoqtError error_code, absl::string_view reason) override; |
| |
| webtransport::Stream* stream() const { return stream_; } |
| |
| void MaybeReadOneObject(); |
| |
| private: |
| friend class test::MoqtSessionPeer; |
| bool IsFetch() const { |
| return parser_.stream_type().has_value() && |
| parser_.stream_type()->IsFetch(); |
| } |
| |
| uint64_t next_object_id_ = 0; |
| bool no_more_objects_ = false; // EndOfGroup or EndOfTrack was received. |
| std::optional<DataStreamIndex> index_; // Only set for subscribe. |
| bool fin_received_ = false; |
| webtransport::Stream* stream_; |
| SubscribeVisitor* visitor_ = nullptr; |
| // Once the subscribe ID is identified, set it here. |
| quiche::QuicheWeakPtr<RemoteTrack> track_; |
| MoqtDataParser parser_; |
| std::string partial_object_; |
| uint64_t bytes_received_this_object_ = 0; |
| SessionToUniStreamInterface* session_; |
| const quic::QuicClock* absl_nonnull clock_; |
| }; |
| |
| } // namespace moqt |
| |
| #endif // QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_ |