blob: 29f98f461f6b24bc312fff9fedf2e9ecfed46817 [file]
// Copyright (c) 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H
#define QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include "absl/base/casts.h"
#include "absl/base/nullability.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_circular_deque.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace test {
class MoqtBidiStreamTestWrapper;
}
using SessionErrorCallback =
quiche::SingleUseCallback<void(MoqtError, absl::string_view)>;
// The provider of this callback owns nothing in MoqtBidiStreamBase. This merely
// deletes the record.
using BidiStreamDeletedCallback = quiche::SingleUseCallback<void()>;
// MoqtBidiStreamBase is the base class for bidirectional streams in MoQT. It
// contains basic methods for handling and dispatching messages. An instance of
// MoqtBidiStreamBase can be created before the underlying stream is available,
// as it might not yet exist due to flow control limits.
class MoqtBidiStreamBase : public webtransport::StreamVisitor {
public:
// Maximum amount of messages buffered on top of the QUIC send buffer.
static constexpr size_t kMaxPendingMessages = 100;
MoqtBidiStreamBase(MoqtFramer* absl_nonnull framer,
const MoqtControlMessageParser& message_parser,
BidiStreamDeletedCallback stream_deleted_callback,
SessionErrorCallback session_error_callback)
: framer_(framer),
message_parser_(message_parser),
stream_deleted_callback_(std::move(stream_deleted_callback)),
session_error_callback_(std::move(session_error_callback)) {}
~MoqtBidiStreamBase() override { std::move(stream_deleted_callback_)(); }
// Binds a WebTransport stream associated with `parser` to this object.
void BindStream(
std::unique_ptr<MoqtControlStreamParser> absl_nonnull parser) {
QUICHE_DCHECK(stream_parser_ == nullptr);
stream_parser_ = std::move(parser);
OnStreamBound();
}
// Binds a WebTransport stream `stream` to this object.
void BindStream(webtransport::Stream* absl_nonnull stream) {
QUICHE_DCHECK(stream_parser_ == nullptr);
stream_parser_ = std::make_unique<MoqtControlStreamParser>(stream);
OnStreamBound();
}
// webtransport::StreamVisitor implementation.
void OnResetStreamReceived(webtransport::StreamErrorCode error) override {}
void OnStopSendingReceived(webtransport::StreamErrorCode error) override {}
void OnWriteSideInDataRecvdState() override {}
void OnCanRead() override;
void OnCanWrite() override;
bool QueueIsFull() const {
return pending_messages_.size() == kMaxPendingMessages;
}
absl::Status SendOrBufferMessage(quiche::QuicheBuffer message,
bool fin = false);
void SendOrBufferMessageOrFatal(quiche::QuicheBuffer message,
bool fin = false) {
CheckStatus(SendOrBufferMessage(std::move(message), fin));
}
absl::Status SendRequestOk(uint64_t request_id,
const MessageParameters& parameters,
bool fin = false);
absl::Status SendRequestError(
uint64_t request_id, RequestErrorCode error_code,
std::optional<quic::QuicTimeDelta> retry_interval,
absl::string_view reason_phrase, bool fin = false);
absl::Status SendRequestError(uint64_t request_id, MoqtRequestErrorInfo info,
bool fin = false);
void Fin() {
fin_queued_ = true;
OnCanWrite();
}
void Reset(webtransport::StreamErrorCode error) {
webtransport::Stream* stream = stream_parser_->stream();
if (stream != nullptr) {
stream->ResetWithUserCode(error);
}
}
// If `status` is not OK, terminates the connection with a fatal error.
void CheckStatus(absl::Status status) {
if (!status.ok()) {
OnFatalError(status);
}
}
protected:
// Called when a WebTransport stream has been associated with the object.
// Should be used to set the priority for the stream.
virtual void OnStreamBound() = 0;
// Called when a control message has been received. The subclass should use
// DispatchControlMessage to process it.
virtual absl::Status OnRawControlMessage(
const MoqtRawControlMessage& message) = 0;
// Terminates the MoQT session due to a fatal error encountered.
void OnFatalError(absl::Status status);
MoqtControlStreamParser* stream_parser() { return stream_parser_.get(); }
MoqtFramer* framer() const { return framer_; }
webtransport::Stream* stream() const {
return stream_parser_ != nullptr ? stream_parser_->stream() : nullptr;
}
// Parses the supplied control message. If the message is well-formed, and the
// class defines an `OnControlMessage` method that accepts it, it is passed to
// that method. Otherwise, an appropriate error message is returned;
// `stream_type` is used to format that message.
template <typename Subclass>
absl::Status DispatchControlMessage(const MoqtRawControlMessage& message,
absl::string_view stream_type) {
static_assert(!std::is_same_v<Subclass, MoqtBidiStreamBase>);
return message_parser_.ParseMessage(message, [&](const auto&
parsed_message) {
if constexpr (CanDispatch<Subclass, decltype(parsed_message)>::value) {
return absl::down_cast<Subclass*>(this)->OnControlMessage(
parsed_message);
} else {
return absl::InvalidArgumentError(
absl::StrCat("Received an unexpected message of type ",
MoqtMessageTypeToString(message.type), " on a ",
stream_type, " stream"));
}
});
}
private:
friend class test::MoqtBidiStreamTestWrapper;
absl::Status AddToQueue(quiche::QuicheBuffer message);
absl::Status SendMessage(quiche::QuicheBuffer message, bool fin);
// CanDispatch<S, M> indicates whether `S` has a method with signature
// absl::Status OnControlMessage(const M&);
template <typename Subclass, typename Message, typename = void>
struct CanDispatch : std::false_type {};
template <typename Subclass, typename Message>
struct CanDispatch<Subclass, Message,
std::enable_if_t<std::is_same_v<
decltype(std::declval<Subclass>().OnControlMessage(
std::declval<Message>())),
absl::Status>>> : std::true_type {};
MoqtFramer* absl_nonnull framer_;
std::unique_ptr<MoqtControlStreamParser> absl_nullable stream_parser_;
MoqtControlMessageParser message_parser_;
quiche::QuicheCircularDeque<quiche::QuicheBuffer> pending_messages_;
bool fin_queued_ = false;
BidiStreamDeletedCallback stream_deleted_callback_;
SessionErrorCallback session_error_callback_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_BIDI_STREAM_H