blob: 3d6b9c61b58e49a998f7197ebd150c5850c07189 [file]
// Copyright 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_bidi_stream.h"
#include <array>
#include <cstdint>
#include <optional>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/web_transport/stream_helpers.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
void MoqtBidiStreamBase::OnCanRead() {
if (stream_parser_ == nullptr) {
QUICHE_BUG(MoqtBidiStreamBase_OnCanRead_no_stream)
<< "OnCanRead() called when no stream is bound";
return;
}
while (!stream_parser_->fin_read()) {
absl::StatusOr<MoqtRawControlMessage> message =
stream_parser_->ReadNextMessage();
if (absl::IsUnavailable(message.status())) {
return;
}
if (!message.ok()) {
OnFatalError(message.status());
return;
}
absl::Status status = OnRawControlMessage(*message);
if (!status.ok()) {
OnFatalError(status);
return;
}
}
}
void MoqtBidiStreamBase::OnCanWrite() {
if (stream_parser_ == nullptr) {
QUICHE_BUG(MoqtBidiStreamBase_OnCanWrite_no_stream)
<< "OnCanWrite() called when no stream is bound";
return;
}
webtransport::Stream* stream = stream_parser_->stream();
if (pending_messages_.empty() && fin_queued_) {
absl::Status status = webtransport::SendFinOnStream(*stream);
if (!status.ok()) {
OnFatalError(status);
}
return;
}
while (!pending_messages_.empty() && stream->CanWrite()) {
absl::Status status =
SendMessage(std::move(pending_messages_.front()),
fin_queued_ && pending_messages_.size() == 1);
pending_messages_.pop_front();
if (!status.ok()) {
OnFatalError(status);
return;
}
}
}
absl::Status MoqtBidiStreamBase::SendOrBufferMessage(
quiche::QuicheBuffer message, bool fin) {
if (fin_queued_) {
return absl::InternalError(
"Trying to send data when a FIN has been already queued");
}
if (stream() == nullptr || !stream()->CanWrite()) {
fin_queued_ = fin;
return AddToQueue(std::move(message));
}
return SendMessage(std::move(message), fin);
}
absl::Status MoqtBidiStreamBase::SendRequestOk(
uint64_t request_id, const MessageParameters& parameters, bool fin) {
return SendOrBufferMessage(
framer_->SerializeRequestOk(MoqtRequestOk{request_id, parameters}), fin);
}
absl::Status MoqtBidiStreamBase::SendRequestError(
uint64_t request_id, RequestErrorCode error_code,
std::optional<quic::QuicTimeDelta> retry_interval,
absl::string_view reason_phrase, bool fin) {
MoqtRequestError request_error;
request_error.request_id = request_id;
request_error.error_code = error_code;
request_error.retry_interval = retry_interval;
request_error.reason_phrase = reason_phrase;
return SendOrBufferMessage(framer_->SerializeRequestError(request_error),
fin);
}
absl::Status MoqtBidiStreamBase::SendRequestError(uint64_t request_id,
MoqtRequestErrorInfo info,
bool fin) {
return SendRequestError(request_id, info.error_code, info.retry_interval,
info.reason_phrase, fin);
}
void MoqtBidiStreamBase::OnFatalError(absl::Status status) {
QUICHE_DCHECK(!status.ok());
if (session_error_callback_ == nullptr) {
return;
}
std::optional<MoqtError> error_code = GetMoqtErrorForStatus(status);
if (!error_code.has_value()) {
error_code = absl::IsInvalidArgument(status) ? MoqtError::kProtocolViolation
: MoqtError::kInternalError;
}
std::move(session_error_callback_)(*error_code, status.message());
}
absl::Status MoqtBidiStreamBase::AddToQueue(quiche::QuicheBuffer message) {
if (pending_messages_.size() == kMaxPendingMessages) {
return absl::ResourceExhaustedError(
"Not enough flow credit on the control stream");
}
pending_messages_.push_back(std::move(message));
return absl::OkStatus();
}
absl::Status MoqtBidiStreamBase::SendMessage(quiche::QuicheBuffer message,
bool fin) {
webtransport::StreamWriteOptions options;
options.set_send_fin(fin);
std::array write_vector = {quiche::QuicheMemSlice(std::move(message))};
return stream()->Writev(absl::MakeSpan(write_vector), options);
}
} // namespace moqt