| // Copyright 2023 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_ |
| #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_ |
| |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/container/btree_map.h" |
| #include "absl/container/btree_set.h" |
| #include "absl/container/flat_hash_map.h" |
| #include "absl/container/flat_hash_set.h" |
| #include "absl/strings/string_view.h" |
| #include "quiche/quic/core/quic_time.h" |
| #include "quiche/quic/core/quic_types.h" |
| #include "quiche/quic/moqt/moqt_framer.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_parser.h" |
| #include "quiche/quic/moqt/moqt_priority.h" |
| #include "quiche/quic/moqt/moqt_publisher.h" |
| #include "quiche/quic/moqt/moqt_subscribe_windows.h" |
| #include "quiche/quic/moqt/moqt_track.h" |
| #include "quiche/common/platform/api/quiche_export.h" |
| #include "quiche/common/quiche_buffer_allocator.h" |
| #include "quiche/common/quiche_callbacks.h" |
| #include "quiche/common/quiche_weak_ptr.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt { |
| |
| namespace test { |
| class MoqtSessionPeer; |
| } |
| |
| using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>; |
| using MoqtSessionTerminatedCallback = |
| quiche::SingleUseCallback<void(absl::string_view error_message)>; |
| using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>; |
| |
| enum class SubscribeType { kSubscribe, kUnsubscribe }; |
| |
| // If |error_message| is nullopt, this is triggered by an ANNOUNCE_OK. |
| // Otherwise, it is triggered by ANNOUNCE_ERROR or ANNOUNCE_CANCEL. For |
| // ERROR or CANCEL, MoqtSession is deleting all ANNOUNCE state immediately |
| // after calling this callback. Alternatively, the application can call |
| // Unannounce() to delete the state. |
| using MoqtOutgoingAnnounceCallback = quiche::MultiUseCallback<void( |
| FullTrackName track_namespace, |
| std::optional<MoqtAnnounceErrorReason> error)>; |
| using MoqtIncomingAnnounceCallback = |
| quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>( |
| FullTrackName track_namespace)>; |
| using MoqtOutgoingSubscribeAnnouncesCallback = quiche::SingleUseCallback<void( |
| FullTrackName track_namespace, std::optional<SubscribeErrorCode> error, |
| absl::string_view reason)>; |
| // If the return value is nullopt, the Session will respond with |
| // SUBSCRIBE_ANNOUNCES_OK. Otherwise, it will respond with |
| // SUBSCRIBE_ANNOUNCES_ERROR. |
| // If |subscribe_type| is kUnsubscribe, this is an UNSUBSCRIBE_ANNOUNCES message |
| // and the return value will be ignored. |
| using MoqtIncomingSubscribeAnnouncesCallback = |
| quiche::MultiUseCallback<std::optional<MoqtSubscribeErrorReason>( |
| const FullTrackName& track_namespace, SubscribeType subscribe_type)>; |
| |
| inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback( |
| FullTrackName /*track_namespace*/) { |
| return std::optional(MoqtAnnounceErrorReason{ |
| MoqtAnnounceErrorCode::kAnnounceNotSupported, |
| "This endpoint does not accept incoming ANNOUNCE messages"}); |
| }; |
| |
| inline std::optional<MoqtSubscribeErrorReason> |
| DefaultIncomingSubscribeAnnouncesCallback(const FullTrackName& track_namespace, |
| SubscribeType /*subscribe_type*/) { |
| return MoqtSubscribeErrorReason{ |
| SubscribeErrorCode::kUnauthorized, |
| "This endpoint does not support incoming SUBSCRIBE_ANNOUNCES messages"}; |
| } |
| |
| // Callbacks for session-level events. |
| struct MoqtSessionCallbacks { |
| MoqtSessionEstablishedCallback session_established_callback = +[] {}; |
| MoqtSessionTerminatedCallback session_terminated_callback = |
| +[](absl::string_view) {}; |
| MoqtSessionDeletedCallback session_deleted_callback = +[] {}; |
| |
| MoqtIncomingAnnounceCallback incoming_announce_callback = |
| DefaultIncomingAnnounceCallback; |
| MoqtIncomingSubscribeAnnouncesCallback incoming_subscribe_announces_callback = |
| DefaultIncomingSubscribeAnnouncesCallback; |
| }; |
| |
| struct SubscriptionWithQueuedStream { |
| webtransport::SendOrder send_order; |
| uint64_t subscription_id; |
| |
| auto operator<=>(const SubscriptionWithQueuedStream& other) const = default; |
| }; |
| |
| // MoqtPublishingMonitorInterface allows a publisher monitor the delivery |
| // progress for a single individual subscriber. |
| class MoqtPublishingMonitorInterface { |
| public: |
| virtual ~MoqtPublishingMonitorInterface() = default; |
| |
| virtual void OnObjectAckSupportKnown(bool supported) = 0; |
| virtual void OnObjectAckReceived(uint64_t group_id, uint64_t object_id, |
| quic::QuicTimeDelta delta_from_deadline) = 0; |
| }; |
| |
| class QUICHE_EXPORT MoqtSession : public webtransport::SessionVisitor { |
| public: |
| MoqtSession(webtransport::Session* session, MoqtSessionParameters parameters, |
| MoqtSessionCallbacks callbacks = MoqtSessionCallbacks()); |
| ~MoqtSession() { std::move(callbacks_.session_deleted_callback)(); } |
| |
| // webtransport::SessionVisitor implementation. |
| void OnSessionReady() override; |
| void OnSessionClosed(webtransport::SessionErrorCode, |
| const std::string&) override; |
| void OnIncomingBidirectionalStreamAvailable() override; |
| void OnIncomingUnidirectionalStreamAvailable() override; |
| void OnDatagramReceived(absl::string_view datagram) override; |
| void OnCanCreateNewOutgoingBidirectionalStream() override {} |
| void OnCanCreateNewOutgoingUnidirectionalStream() override; |
| |
| void Error(MoqtError code, absl::string_view error); |
| |
| quic::Perspective perspective() const { return parameters_.perspective; } |
| |
| // Returns true if message was sent. |
| bool SubscribeAnnounces( |
| FullTrackName track_namespace, |
| MoqtOutgoingSubscribeAnnouncesCallback callback, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| bool UnsubscribeAnnounces(FullTrackName track_namespace); |
| |
| // Send an ANNOUNCE message for |track_namespace|, and call |
| // |announce_callback| when the response arrives. Will fail immediately if |
| // there is already an unresolved ANNOUNCE for that namespace. |
| void Announce(FullTrackName track_namespace, |
| MoqtOutgoingAnnounceCallback announce_callback); |
| // Returns true if message was sent, false if there is no ANNOUNCE to cancel. |
| bool Unannounce(FullTrackName track_namespace); |
| |
| // Returns true if SUBSCRIBE was sent. If there is already a subscription to |
| // the track, the message will still be sent. However, the visitor will be |
| // ignored. |
| // Subscribe from (start_group, start_object) to the end of the track. |
| bool SubscribeAbsolute( |
| const FullTrackName& name, uint64_t start_group, uint64_t start_object, |
| SubscribeRemoteTrack::Visitor* visitor, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| // Subscribe from (start_group, start_object) to the end of end_group. |
| bool SubscribeAbsolute( |
| const FullTrackName& name, uint64_t start_group, uint64_t start_object, |
| uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| // Subscribe from (start_group, start_object) to (end_group, end_object). |
| bool SubscribeAbsolute( |
| const FullTrackName& name, uint64_t start_group, uint64_t start_object, |
| uint64_t end_group, uint64_t end_object, |
| SubscribeRemoteTrack::Visitor* visitor, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| bool SubscribeCurrentObject( |
| const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| bool SubscribeCurrentGroup( |
| const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, |
| MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); |
| // Returns false if the subscription is not found. The session immediately |
| // destroys all subscription state. |
| void Unsubscribe(const FullTrackName& name); |
| |
| webtransport::Session* session() { return session_; } |
| MoqtSessionCallbacks& callbacks() { return callbacks_; } |
| MoqtPublisher* publisher() { return publisher_; } |
| void set_publisher(MoqtPublisher* publisher) { publisher_ = publisher; } |
| bool support_object_acks() const { return parameters_.support_object_acks; } |
| void set_support_object_acks(bool value) { |
| QUICHE_DCHECK(!control_stream_.has_value()) |
| << "support_object_acks needs to be set before handshake"; |
| parameters_.support_object_acks = value; |
| } |
| |
| // Assigns a monitoring interface for a specific track subscription that is |
| // expected to happen in the future. `interface` will be only used for a |
| // single subscription, and it must outlive the session. |
| void SetMonitoringInterfaceForTrack( |
| FullTrackName track, MoqtPublishingMonitorInterface* interface) { |
| monitoring_interfaces_for_published_tracks_.emplace(std::move(track), |
| interface); |
| } |
| |
| void Close() { session_->CloseSession(0, "Application closed"); } |
| |
| // Tells the session that the highest send order for pending streams in a |
| // subscription has changed. If |old_send_order| is nullopt, this is the |
| // first pending stream. If |new_send_order| is nullopt, the subscription |
| // has no pending streams anymore. |
| void UpdateQueuedSendOrder( |
| uint64_t subscribe_id, |
| std::optional<webtransport::SendOrder> old_send_order, |
| std::optional<webtransport::SendOrder> new_send_order); |
| |
| void GrantMoreSubscribes(uint64_t num_subscribes); |
| |
| private: |
| friend class test::MoqtSessionPeer; |
| |
| struct Empty {}; |
| |
| class QUICHE_EXPORT ControlStream : public webtransport::StreamVisitor, |
| public MoqtControlParserVisitor { |
| public: |
| ControlStream(MoqtSession* session, webtransport::Stream* stream); |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override; |
| void OnCanWrite() override; |
| void OnResetStreamReceived(webtransport::StreamErrorCode error) override; |
| void OnStopSendingReceived(webtransport::StreamErrorCode error) override; |
| void OnWriteSideInDataRecvdState() override {} |
| |
| // MoqtControlParserVisitor implementation. |
| void OnClientSetupMessage(const MoqtClientSetup& message) override; |
| void OnServerSetupMessage(const MoqtServerSetup& message) override; |
| void OnSubscribeMessage(const MoqtSubscribe& message) override; |
| void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override; |
| void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override; |
| void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override; |
| // There is no state to update for SUBSCRIBE_DONE. |
| void OnSubscribeDoneMessage(const MoqtSubscribeDone& /*message*/) override { |
| } |
| void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override; |
| void OnAnnounceMessage(const MoqtAnnounce& message) override; |
| void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override; |
| void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override; |
| void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override; |
| void OnTrackStatusRequestMessage( |
| const MoqtTrackStatusRequest& message) override {}; |
| void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {} |
| void OnTrackStatusMessage(const MoqtTrackStatus& message) override {} |
| void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {} |
| void OnSubscribeAnnouncesMessage( |
| const MoqtSubscribeAnnounces& message) override; |
| void OnSubscribeAnnouncesOkMessage( |
| const MoqtSubscribeAnnouncesOk& message) override; |
| void OnSubscribeAnnouncesErrorMessage( |
| const MoqtSubscribeAnnouncesError& message) override; |
| void OnUnsubscribeAnnouncesMessage( |
| const MoqtUnsubscribeAnnounces& message) override; |
| void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override; |
| void OnFetchMessage(const MoqtFetch& message) override; |
| void OnFetchCancelMessage(const MoqtFetchCancel& message) override {} |
| void OnFetchOkMessage(const MoqtFetchOk& message) override {} |
| void OnFetchErrorMessage(const MoqtFetchError& message) override {} |
| void OnObjectAckMessage(const MoqtObjectAck& message) override { |
| auto subscription_it = |
| session_->published_subscriptions_.find(message.subscribe_id); |
| if (subscription_it == session_->published_subscriptions_.end()) { |
| return; |
| } |
| subscription_it->second->ProcessObjectAck(message); |
| } |
| void OnParsingError(MoqtError error_code, |
| absl::string_view reason) override; |
| |
| quic::Perspective perspective() const { |
| return session_->parameters_.perspective; |
| } |
| |
| webtransport::Stream* stream() const { return stream_; } |
| |
| // Sends a control message, or buffers it if there is insufficient flow |
| // control credit. |
| void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false); |
| |
| private: |
| friend class test::MoqtSessionPeer; |
| void SendSubscribeError(const MoqtSubscribe& message, |
| SubscribeErrorCode error_code, |
| absl::string_view reason_phrase, |
| uint64_t track_alias); |
| void SendFetchError(uint64_t subscribe_id, SubscribeErrorCode error_code, |
| absl::string_view reason_phrase); |
| |
| MoqtSession* session_; |
| webtransport::Stream* stream_; |
| MoqtControlParser parser_; |
| }; |
| class QUICHE_EXPORT IncomingDataStream : public webtransport::StreamVisitor, |
| public MoqtDataParserVisitor { |
| public: |
| IncomingDataStream(MoqtSession* session, webtransport::Stream* stream) |
| : session_(session), stream_(stream), parser_(stream, this) {} |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override; |
| void OnCanWrite() override {} |
| void OnResetStreamReceived(webtransport::StreamErrorCode error) override {} |
| void OnStopSendingReceived(webtransport::StreamErrorCode error) override {} |
| void OnWriteSideInDataRecvdState() override {} |
| |
| // MoqtParserVisitor implementation. |
| // TODO: Handle a stream FIN. |
| void OnObjectMessage(const MoqtObject& message, absl::string_view payload, |
| bool end_of_message) override; |
| void OnParsingError(MoqtError error_code, |
| absl::string_view reason) override; |
| |
| quic::Perspective perspective() const { |
| return session_->parameters_.perspective; |
| } |
| |
| webtransport::Stream* stream() const { return stream_; } |
| |
| private: |
| friend class test::MoqtSessionPeer; |
| void OnControlMessageReceived(); |
| |
| MoqtSession* session_; |
| webtransport::Stream* stream_; |
| // Once the subscribe ID is identified, set it here. |
| quiche::QuicheWeakPtr<RemoteTrack> track_; |
| // std::optional<uint64_t> subscribe_id_ = std::nullopt; |
| MoqtDataParser parser_; |
| std::string partial_object_; |
| }; |
| // Represents a record for a single subscription to a local track that is |
| // being sent to the peer. |
| class PublishedSubscription : public MoqtObjectListener { |
| public: |
| explicit PublishedSubscription( |
| MoqtSession* session, |
| std::shared_ptr<MoqtTrackPublisher> track_publisher, |
| const MoqtSubscribe& subscribe, |
| MoqtPublishingMonitorInterface* monitoring_interface); |
| ~PublishedSubscription(); |
| |
| PublishedSubscription(const PublishedSubscription&) = delete; |
| PublishedSubscription(PublishedSubscription&&) = delete; |
| PublishedSubscription& operator=(const PublishedSubscription&) = delete; |
| PublishedSubscription& operator=(PublishedSubscription&&) = delete; |
| |
| uint64_t subscription_id() const { return subscription_id_; } |
| MoqtTrackPublisher& publisher() { return *track_publisher_; } |
| uint64_t track_alias() const { return track_alias_; } |
| std::optional<FullSequence> largest_sent() const { return largest_sent_; } |
| MoqtPriority subscriber_priority() const { return subscriber_priority_; } |
| std::optional<MoqtDeliveryOrder> subscriber_delivery_order() const { |
| return subscriber_delivery_order_; |
| } |
| void set_subscriber_priority(MoqtPriority priority); |
| |
| // This is only called for objects that have just arrived. |
| void OnNewObjectAvailable(FullSequence sequence) override; |
| void OnTrackPublisherGone() override; |
| void OnNewFinAvailable(FullSequence sequence) override; |
| void OnGroupAbandoned(uint64_t group_id) override; |
| void ProcessObjectAck(const MoqtObjectAck& message) { |
| if (monitoring_interface_ == nullptr) { |
| return; |
| } |
| monitoring_interface_->OnObjectAckReceived( |
| message.group_id, message.object_id, message.delta_from_deadline); |
| } |
| |
| // Creates streams for all objects that are currently in the track's object |
| // cache and match the subscription window. This is in some sense similar |
| // to a fetch (since all of the objects are in the past), but is |
| // conceptually simpler, as backpressure is less of a concern. |
| void Backfill(); |
| |
| // Updates the window and other properties of the subscription in question. |
| void Update(FullSequence start, std::optional<FullSequence> end, |
| MoqtPriority subscriber_priority); |
| // Checks if the specified sequence is within the window of this |
| // subscription. |
| bool InWindow(FullSequence sequence) { return window_.InWindow(sequence); } |
| |
| void OnDataStreamCreated(webtransport::StreamId id, |
| FullSequence start_sequence); |
| void OnDataStreamDestroyed(webtransport::StreamId id, |
| FullSequence end_sequence); |
| void OnObjectSent(FullSequence sequence); |
| |
| std::vector<webtransport::StreamId> GetAllStreams() const; |
| |
| webtransport::SendOrder GetSendOrder(FullSequence sequence) const; |
| |
| void AddQueuedOutgoingDataStream(FullSequence first_object); |
| // Pops the pending outgoing data stream, with the highest send order. |
| // The session keeps track of which subscribes have pending streams. This |
| // function will trigger a QUICHE_DCHECK if called when there are no pending |
| // streams. |
| FullSequence NextQueuedOutgoingDataStream(); |
| |
| private: |
| SendStreamMap& stream_map(); |
| quic::Perspective perspective() const { |
| return session_->parameters_.perspective; |
| } |
| |
| void SendDatagram(FullSequence sequence); |
| webtransport::SendOrder FinalizeSendOrder( |
| webtransport::SendOrder send_order) { |
| return UpdateSendOrderForSubscriberPriority(send_order, |
| subscriber_priority_); |
| } |
| |
| uint64_t subscription_id_; |
| MoqtSession* session_; |
| std::shared_ptr<MoqtTrackPublisher> track_publisher_; |
| uint64_t track_alias_; |
| SubscribeWindow window_; |
| MoqtPriority subscriber_priority_; |
| std::optional<MoqtDeliveryOrder> subscriber_delivery_order_; |
| MoqtPublishingMonitorInterface* monitoring_interface_; |
| // Largest sequence number ever sent via this subscription. |
| std::optional<FullSequence> largest_sent_; |
| // Should be almost always accessed via `stream_map()`. |
| std::optional<SendStreamMap> lazily_initialized_stream_map_; |
| // Store the send order of queued outgoing data streams. Use a |
| // subscriber_priority_ of zero to avoid having to update it, and call |
| // FinalizeSendOrder() whenever delivering it to the MoqtSession.d |
| absl::btree_multimap<webtransport::SendOrder, FullSequence> |
| queued_outgoing_data_streams_; |
| }; |
| class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor { |
| public: |
| OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream, |
| PublishedSubscription& subscription, |
| FullSequence first_object); |
| ~OutgoingDataStream(); |
| |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override {} |
| void OnCanWrite() override; |
| void OnResetStreamReceived(webtransport::StreamErrorCode error) override {} |
| void OnStopSendingReceived(webtransport::StreamErrorCode error) override {} |
| void OnWriteSideInDataRecvdState() override {} |
| |
| webtransport::Stream* stream() const { return stream_; } |
| |
| // Sends objects on the stream, starting with `next_object_`, until the |
| // stream becomes write-blocked or closed. |
| void SendObjects(PublishedSubscription& subscription); |
| |
| // Sends a pure FIN on the stream, if the last object sent matches |
| // |last_object|. Otherwise, does nothing. |
| void Fin(FullSequence last_object); |
| |
| // Recomputes the send order and updates it for the associated stream. |
| void UpdateSendOrder(PublishedSubscription& subscription); |
| |
| private: |
| friend class test::MoqtSessionPeer; |
| |
| // Checks whether the associated subscription is still valid; if not, resets |
| // the stream and returns nullptr. |
| PublishedSubscription* GetSubscriptionIfValid(); |
| |
| MoqtSession* session_; |
| webtransport::Stream* stream_; |
| uint64_t subscription_id_; |
| // A FullSequence with the minimum object ID that should go out next. The |
| // session doesn't know what the next object ID in the stream is because |
| // the next object could be in a different subgroup or simply be skipped. |
| FullSequence next_object_; |
| bool stream_header_written_ = false; |
| // A weak pointer to an object owned by the session. Used to make sure the |
| // session does not get called after being destroyed. |
| std::weak_ptr<void> session_liveness_; |
| }; |
| |
| class QUICHE_EXPORT PublishedFetch { |
| public: |
| PublishedFetch(uint64_t fetch_id, MoqtSession* session, |
| std::unique_ptr<MoqtFetchTask> fetch) |
| : session_(session), fetch_(std::move(fetch)), fetch_id_(fetch_id) {} |
| |
| class FetchStreamVisitor : public webtransport::StreamVisitor { |
| public: |
| FetchStreamVisitor(std::shared_ptr<PublishedFetch> fetch, |
| webtransport::Stream* stream) |
| : fetch_(fetch), stream_(stream) { |
| fetch->fetch_task()->SetObjectAvailableCallback( |
| [this]() { this->OnCanWrite(); }); |
| } |
| ~FetchStreamVisitor() { |
| std::shared_ptr<PublishedFetch> fetch = fetch_.lock(); |
| if (fetch != nullptr) { |
| fetch->session()->incoming_fetches_.erase(fetch->fetch_id_); |
| } |
| } |
| // webtransport::StreamVisitor implementation. |
| void OnCanRead() override {} // Write-only stream. |
| void OnCanWrite() override; |
| void OnResetStreamReceived(webtransport::StreamErrorCode error) override { |
| } // Write-only stream |
| void OnStopSendingReceived(webtransport::StreamErrorCode error) override { |
| } |
| void OnWriteSideInDataRecvdState() override {} |
| |
| private: |
| std::weak_ptr<PublishedFetch> fetch_; |
| webtransport::Stream* stream_; |
| bool stream_header_written_ = false; |
| }; |
| |
| MoqtFetchTask* fetch_task() { return fetch_.get(); } |
| MoqtSession* session() { return session_; } |
| uint64_t fetch_id() const { return fetch_id_; } |
| void SetStreamId(webtransport::StreamId id) { stream_id_ = id; } |
| |
| private: |
| MoqtSession* session_; |
| std::unique_ptr<MoqtFetchTask> fetch_; |
| uint64_t fetch_id_; |
| // Store the stream ID in case a FETCH_CANCEL requires a reset. |
| std::optional<webtransport::StreamId> stream_id_; |
| }; |
| |
| // Private members of MoqtSession. |
| |
| // Returns true if SUBSCRIBE_DONE was sent. |
| bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code, |
| absl::string_view reason_phrase); |
| // Returns the pointer to the control stream, or nullptr if none is present. |
| ControlStream* GetControlStream(); |
| // Sends a message on the control stream; QUICHE_DCHECKs if no control stream |
| // is present. |
| void SendControlMessage(quiche::QuicheBuffer message); |
| |
| // Returns false if the SUBSCRIBE isn't sent. |provided_track_alias| has a |
| // value only if this call is due to a SUBSCRIBE_ERROR. |
| bool Subscribe(MoqtSubscribe& message, SubscribeRemoteTrack::Visitor* visitor, |
| std::optional<uint64_t> provided_track_alias = std::nullopt); |
| |
| // Opens a new data stream, or queues it if the session is flow control |
| // blocked. |
| webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id, |
| FullSequence first_object); |
| // Same as above, except the session is required to be not flow control |
| // blocked. |
| webtransport::Stream* OpenDataStream(PublishedSubscription& subscription, |
| FullSequence first_object); |
| // Returns false if creation failed. |
| [[nodiscard]] bool OpenDataStream(std::shared_ptr<PublishedFetch> fetch); |
| |
| SubscribeRemoteTrack* RemoteTrackByAlias(uint64_t track_alias); |
| RemoteTrack* RemoteTrackById(uint64_t subscribe_id); |
| RemoteTrack* RemoteTrackByName(const FullTrackName& name); |
| |
| // Checks that a subscribe ID from a SUBSCRIBE or FETCH is valid, and throws |
| // a session error if is not. |
| bool ValidateSubscribeId(uint64_t subscribe_id); |
| |
| // Actually sends an object on |stream| with track alias or fetch ID |id| |
| // and metadata in |object|. Not for use with datagrams. Returns |true| if |
| // the write was successful. |
| bool WriteObjectToStream(webtransport::Stream* stream, uint64_t id, |
| const PublishedObject& object, |
| MoqtDataStreamType type, bool is_first_on_stream, |
| bool fin); |
| |
| // Sends an OBJECT_ACK message for a specific subscribe ID. |
| void SendObjectAck(uint64_t subscribe_id, uint64_t group_id, |
| uint64_t object_id, |
| quic::QuicTimeDelta delta_from_deadline) { |
| if (!SupportsObjectAck()) { |
| return; |
| } |
| MoqtObjectAck ack; |
| ack.subscribe_id = subscribe_id; |
| ack.group_id = group_id; |
| ack.object_id = object_id; |
| ack.delta_from_deadline = delta_from_deadline; |
| SendControlMessage(framer_.SerializeObjectAck(ack)); |
| } |
| |
| // Indicates if OBJECT_ACK is supported by both sides. |
| bool SupportsObjectAck() const { |
| return parameters_.support_object_acks && peer_supports_object_ack_; |
| } |
| |
| webtransport::Session* session_; |
| MoqtSessionParameters parameters_; |
| MoqtSessionCallbacks callbacks_; |
| MoqtFramer framer_; |
| |
| std::optional<webtransport::StreamId> control_stream_; |
| bool peer_supports_object_ack_ = false; |
| std::string error_; |
| |
| // Upstream SUBSCRIBE state. |
| // All the tracks the session is subscribed to, indexed by track_alias. |
| absl::flat_hash_map<uint64_t, std::unique_ptr<SubscribeRemoteTrack>> |
| subscribe_by_alias_; |
| // Upstream SUBSCRIBEs indexed by subscribe_id. |
| // TODO(martinduke): Add fetches to this. |
| absl::flat_hash_map<uint64_t, RemoteTrack*> upstream_by_id_; |
| // The application only has track names, so this allows MoqtSession to |
| // quickly find what it's looking for. Also allows a quick check for duplicate |
| // subscriptions. |
| absl::flat_hash_map<FullTrackName, RemoteTrack*> upstream_by_name_; |
| uint64_t next_remote_track_alias_ = 0; |
| // The next subscribe ID that the local endpoint can send. |
| uint64_t next_subscribe_id_ = 0; |
| // The local endpoint can send subscribe IDs less than this value. |
| uint64_t peer_max_subscribe_id_ = 0; |
| |
| // All open incoming subscriptions, indexed by track name, used to check for |
| // duplicates. |
| absl::flat_hash_set<FullTrackName> subscribed_track_names_; |
| // Application object representing the publisher for all of the tracks that |
| // can be subscribed to via this connection. Must outlive this object. |
| MoqtPublisher* publisher_; |
| // Subscriptions for local tracks by the remote peer, indexed by subscribe ID. |
| absl::flat_hash_map<uint64_t, std::unique_ptr<PublishedSubscription>> |
| published_subscriptions_; |
| // Keeps track of all subscribe IDs that have queued outgoing data streams. |
| absl::btree_set<SubscriptionWithQueuedStream> |
| subscribes_with_queued_outgoing_data_streams_; |
| // This is only used to check for track_alias collisions. |
| absl::flat_hash_set<uint64_t> used_track_aliases_; |
| uint64_t next_local_track_alias_ = 0; |
| |
| // Incoming FETCHes, indexed by fetch ID. There will be other pointers to |
| // PublishedFetch, so storing a shared_ptr in the map provides pointer |
| // stability for the value. |
| absl::flat_hash_map<uint64_t, std::shared_ptr<PublishedFetch>> |
| incoming_fetches_; |
| |
| // Monitoring interfaces for expected incoming subscriptions. |
| absl::flat_hash_map<FullTrackName, MoqtPublishingMonitorInterface*> |
| monitoring_interfaces_for_published_tracks_; |
| |
| // Indexed by track namespace. If the value is not nullptr, no OK or ERROR |
| // has been received. The entry is deleted after sending UNANNOUNCE or |
| // receiving ANNOUNCE_CANCEL. |
| absl::flat_hash_map<FullTrackName, MoqtOutgoingAnnounceCallback> |
| outgoing_announces_; |
| // The value is nullptr after OK or ERROR is received. The entry is deleted |
| // when sending UNSUBSCRIBE_ANNOUNCES, to make sure the application doesn't |
| // unsubscribe from something that it isn't subscribed to. ANNOUNCEs that |
| // result from this subscription use incoming_announce_callback. |
| absl::flat_hash_map<FullTrackName, MoqtOutgoingSubscribeAnnouncesCallback> |
| outgoing_subscribe_announces_; |
| |
| // The role the peer advertised in its SETUP message. Initialize it to avoid |
| // an uninitialized value if no SETUP arrives or it arrives with no Role |
| // parameter, and other checks have changed/been disabled. |
| MoqtRole peer_role_ = MoqtRole::kPubSub; |
| |
| // The minimum subscribe ID the peer can use that is monotonically increasing. |
| uint64_t next_incoming_subscribe_id_ = 0; |
| // The maximum subscribe ID sent to the peer. Peer-generated IDs must be less |
| // than this value. |
| uint64_t local_max_subscribe_id_ = 0; |
| |
| // Must be last. Token used to make sure that the streams do not call into |
| // the session when the session has already been destroyed. |
| |
| std::shared_ptr<Empty> liveness_token_; |
| }; |
| |
| } // namespace moqt |
| |
| #endif // QUICHE_QUIC_MOQT_MOQT_SESSION_H_ |