| // Copyright (c) 2026 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #ifndef QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H |
| #define QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <type_traits> |
| #include <utility> |
| |
| #include "absl/base/casts.h" |
| #include "absl/base/nullability.h" |
| #include "absl/status/status.h" |
| #include "absl/strings/str_cat.h" |
| #include "absl/strings/string_view.h" |
| #include "quiche/quic/core/quic_time.h" |
| #include "quiche/quic/moqt/moqt_error.h" |
| #include "quiche/quic/moqt/moqt_framer.h" |
| #include "quiche/quic/moqt/moqt_key_value_pair.h" |
| #include "quiche/quic/moqt/moqt_messages.h" |
| #include "quiche/quic/moqt/moqt_parser.h" |
| #include "quiche/common/platform/api/quiche_logging.h" |
| #include "quiche/common/quiche_buffer_allocator.h" |
| #include "quiche/common/quiche_callbacks.h" |
| #include "quiche/common/quiche_circular_deque.h" |
| #include "quiche/web_transport/web_transport.h" |
| |
| namespace moqt { |
| |
| namespace test { |
| class MoqtBidiStreamTestWrapper; |
| } |
| |
| using SessionErrorCallback = |
| quiche::SingleUseCallback<void(MoqtError, absl::string_view)>; |
| // The provider of this callback owns nothing in MoqtBidiStreamBase. This merely |
| // deletes the record. |
| using BidiStreamDeletedCallback = quiche::SingleUseCallback<void()>; |
| |
| // MoqtBidiStreamBase is the base class for bidirectional streams in MoQT. It |
| // contains basic methods for handling and dispatching messages. An instance of |
| // MoqtBidiStreamBase can be created before the underlying stream is available, |
| // as it might not yet exist due to flow control limits. |
| class MoqtBidiStreamBase : public webtransport::StreamVisitor { |
| public: |
| // Maximum amount of messages buffered on top of the QUIC send buffer. |
| static constexpr size_t kMaxPendingMessages = 100; |
| |
| MoqtBidiStreamBase(MoqtFramer* absl_nonnull framer, |
| const MoqtControlMessageParser& message_parser, |
| BidiStreamDeletedCallback stream_deleted_callback, |
| SessionErrorCallback session_error_callback) |
| : framer_(framer), |
| message_parser_(message_parser), |
| stream_deleted_callback_(std::move(stream_deleted_callback)), |
| session_error_callback_(std::move(session_error_callback)) {} |
| ~MoqtBidiStreamBase() override { std::move(stream_deleted_callback_)(); } |
| |
| // Binds a WebTransport stream associated with `parser` to this object. |
| void BindStream( |
| std::unique_ptr<MoqtControlStreamParser> absl_nonnull parser) { |
| QUICHE_DCHECK(stream_parser_ == nullptr); |
| stream_parser_ = std::move(parser); |
| OnStreamBound(); |
| } |
| // Binds a WebTransport stream `stream` to this object. |
| void BindStream(webtransport::Stream* absl_nonnull stream) { |
| QUICHE_DCHECK(stream_parser_ == nullptr); |
| stream_parser_ = std::make_unique<MoqtControlStreamParser>(stream); |
| OnStreamBound(); |
| } |
| |
| // webtransport::StreamVisitor implementation. |
| void OnResetStreamReceived(webtransport::StreamErrorCode error) override {} |
| void OnStopSendingReceived(webtransport::StreamErrorCode error) override {} |
| void OnWriteSideInDataRecvdState() override {} |
| void OnCanRead() override; |
| void OnCanWrite() override; |
| |
| bool QueueIsFull() const { |
| return pending_messages_.size() == kMaxPendingMessages; |
| } |
| |
| absl::Status SendOrBufferMessage(quiche::QuicheBuffer message, |
| bool fin = false); |
| void SendOrBufferMessageOrFatal(quiche::QuicheBuffer message, |
| bool fin = false) { |
| CheckStatus(SendOrBufferMessage(std::move(message), fin)); |
| } |
| |
| absl::Status SendRequestOk(uint64_t request_id, |
| const MessageParameters& parameters, |
| bool fin = false); |
| absl::Status SendRequestError( |
| uint64_t request_id, RequestErrorCode error_code, |
| std::optional<quic::QuicTimeDelta> retry_interval, |
| absl::string_view reason_phrase, bool fin = false); |
| absl::Status SendRequestError(uint64_t request_id, MoqtRequestErrorInfo info, |
| bool fin = false); |
| |
| void Fin() { |
| fin_queued_ = true; |
| OnCanWrite(); |
| } |
| void Reset(webtransport::StreamErrorCode error) { |
| webtransport::Stream* stream = stream_parser_->stream(); |
| if (stream != nullptr) { |
| stream->ResetWithUserCode(error); |
| } |
| } |
| |
| // If `status` is not OK, terminates the connection with a fatal error. |
| void CheckStatus(absl::Status status) { |
| if (!status.ok()) { |
| OnFatalError(status); |
| } |
| } |
| |
| protected: |
| // Called when a WebTransport stream has been associated with the object. |
| // Should be used to set the priority for the stream. |
| virtual void OnStreamBound() = 0; |
| |
| // Called when a control message has been received. The subclass should use |
| // DispatchControlMessage to process it. |
| virtual absl::Status OnRawControlMessage( |
| const MoqtRawControlMessage& message) = 0; |
| |
| // Terminates the MoQT session due to a fatal error encountered. |
| void OnFatalError(absl::Status status); |
| |
| MoqtControlStreamParser* stream_parser() { return stream_parser_.get(); } |
| MoqtFramer* framer() const { return framer_; } |
| webtransport::Stream* stream() const { |
| return stream_parser_ != nullptr ? stream_parser_->stream() : nullptr; |
| } |
| |
| // Parses the supplied control message. If the message is well-formed, and the |
| // class defines an `OnControlMessage` method that accepts it, it is passed to |
| // that method. Otherwise, an appropriate error message is returned; |
| // `stream_type` is used to format that message. |
| template <typename Subclass> |
| absl::Status DispatchControlMessage(const MoqtRawControlMessage& message, |
| absl::string_view stream_type) { |
| static_assert(!std::is_same_v<Subclass, MoqtBidiStreamBase>); |
| return message_parser_.ParseMessage(message, [&](const auto& |
| parsed_message) { |
| if constexpr (CanDispatch<Subclass, decltype(parsed_message)>::value) { |
| return absl::down_cast<Subclass*>(this)->OnControlMessage( |
| parsed_message); |
| } else { |
| return absl::InvalidArgumentError( |
| absl::StrCat("Received an unexpected message of type ", |
| MoqtMessageTypeToString(message.type), " on a ", |
| stream_type, " stream")); |
| } |
| }); |
| } |
| |
| private: |
| friend class test::MoqtBidiStreamTestWrapper; |
| |
| absl::Status AddToQueue(quiche::QuicheBuffer message); |
| absl::Status SendMessage(quiche::QuicheBuffer message, bool fin); |
| |
| // CanDispatch<S, M> indicates whether `S` has a method with signature |
| // absl::Status OnControlMessage(const M&); |
| template <typename Subclass, typename Message, typename = void> |
| struct CanDispatch : std::false_type {}; |
| template <typename Subclass, typename Message> |
| struct CanDispatch<Subclass, Message, |
| std::enable_if_t<std::is_same_v< |
| decltype(std::declval<Subclass>().OnControlMessage( |
| std::declval<Message>())), |
| absl::Status>>> : std::true_type {}; |
| |
| MoqtFramer* absl_nonnull framer_; |
| std::unique_ptr<MoqtControlStreamParser> absl_nullable stream_parser_; |
| MoqtControlMessageParser message_parser_; |
| quiche::QuicheCircularDeque<quiche::QuicheBuffer> pending_messages_; |
| bool fin_queued_ = false; |
| BidiStreamDeletedCallback stream_deleted_callback_; |
| SessionErrorCallback session_error_callback_; |
| }; |
| |
| } // namespace moqt |
| |
| #endif // QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H |