blob: 69b87322f13e7963f9688ea0091efa25cace638c [file]
// Copyright (c) 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_namespace_stream.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "absl/base/nullability.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_bidi_stream.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_parser.h"
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/session_namespace_tree.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/web_transport/stream_helpers.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
MoqtNamespaceSubscriberStream::~MoqtNamespaceSubscriberStream() {
NamespaceTask* task = task_.GetIfAvailable();
if (task != nullptr) {
task->DeclareEof();
}
}
absl::Status MoqtNamespaceSubscriberStream::OnRawControlMessage(
const MoqtRawControlMessage& message) {
return DispatchControlMessage<MoqtNamespaceSubscriberStream>(
message, "namespace subscriber");
}
void MoqtNamespaceSubscriberStream::OnStreamBound() {
// TODO(martinduke): Set the priority for this stream.
}
absl::Status MoqtNamespaceSubscriberStream::OnControlMessage(
const MoqtRequestOk& message) {
if (message.request_id == request_id_) {
// Response to the initial SUBSCRIBE_NAMESPACE.
if (response_callback_ == nullptr) {
return absl::InvalidArgumentError("Two responses");
}
std::move(response_callback_)(std::nullopt);
response_callback_ = nullptr;
return absl::OkStatus();
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
// The application has already unsubscribed, and the stream has been reset.
// This is irrelevant.
return absl::OkStatus();
}
MoqtResponseCallback callback = task->GetResponseCallback(message.request_id);
if (callback == nullptr) {
return absl::InvalidArgumentError("Unexpected request ID in response");
}
std::move(callback)(std::nullopt);
return absl::OkStatus();
}
absl::Status MoqtNamespaceSubscriberStream::OnControlMessage(
const MoqtRequestError& message) {
if (message.request_id == request_id_) {
if (response_callback_ == nullptr) {
return absl::InvalidArgumentError("Two responses");
}
std::move(response_callback_)(MoqtRequestErrorInfo{
message.error_code, message.retry_interval, message.reason_phrase});
response_callback_ = nullptr;
return absl::OkStatus();
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
// The application has already unsubscribed, and the stream has been reset.
// This is irrelevant.
return absl::OkStatus();
}
MoqtResponseCallback callback = task->GetResponseCallback(message.request_id);
if (callback == nullptr) {
return absl::InvalidArgumentError("Unexpected request ID in response");
}
std::move(callback)(MoqtRequestErrorInfo{
message.error_code, message.retry_interval, message.reason_phrase});
return absl::OkStatus();
}
absl::Status MoqtNamespaceSubscriberStream::OnControlMessage(
const MoqtNamespace& message) {
if (response_callback_ != nullptr) {
return absl::InvalidArgumentError(
"First message must be REQUEST_OK or REQUEST_ERROR");
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
// The application has already unsubscribed, and the stream has been reset.
// This is irrelevant.
return absl::OkStatus();
}
if (task->prefix().number_of_elements() +
message.track_namespace_suffix.number_of_elements() >
kMaxNamespaceElements) {
return absl::InvalidArgumentError("Too many namespace elements");
}
if (task->prefix().total_length() +
message.track_namespace_suffix.total_length() >
kMaxFullTrackNameSize) {
return absl::InvalidArgumentError("Namespace too large");
}
auto [it, inserted] =
published_suffixes_.insert(message.track_namespace_suffix);
if (!inserted) {
return absl::InvalidArgumentError(
"Two NAMESPACE messages for the same track namespace");
}
QUIC_DLOG(INFO) << "Received NAMESPACE message for "
<< message.track_namespace_suffix;
task->AddPendingSuffix(message.track_namespace_suffix, TransactionType::kAdd);
return absl::OkStatus();
}
absl::Status MoqtNamespaceSubscriberStream::OnControlMessage(
const MoqtNamespaceDone& message) {
if (response_callback_ != nullptr) {
return absl::InvalidArgumentError(
"First message must be REQUEST_OK or REQUEST_ERROR");
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
return absl::OkStatus();
}
if (published_suffixes_.erase(message.track_namespace_suffix) == 0) {
return absl::InvalidArgumentError(
"NAMESPACE_DONE with no active namespace");
}
QUIC_DLOG(INFO) << "Received NAMESPACE_DONE message for "
<< message.track_namespace_suffix;
task->AddPendingSuffix(message.track_namespace_suffix,
TransactionType::kDelete);
return absl::OkStatus();
}
std::unique_ptr<MoqtNamespaceTask> MoqtNamespaceSubscriberStream::CreateTask(
const TrackNamespace& prefix) {
auto task = std::make_unique<NamespaceTask>(this, prefix);
QUICHE_DCHECK(task != nullptr);
task_ = task->GetWeakPtr();
QUICHE_DCHECK(task_.IsValid());
return std::move(task);
}
MoqtNamespaceSubscriberStream::NamespaceTask::~NamespaceTask() {
if (state_ != nullptr) {
state_->Reset(kResetCodeCancelled);
}
}
void MoqtNamespaceSubscriberStream::NamespaceTask::SetObjectsAvailableCallback(
ObjectsAvailableCallback absl_nullable callback) {
callback_ = std::move(callback);
if (!pending_suffixes_.empty() && callback_ != nullptr) {
callback_();
}
}
void MoqtNamespaceSubscriberStream::NamespaceTask::Update(
const MessageParameters& parameters,
MoqtResponseCallback response_callback) {
if (state_ == nullptr) {
std::move(response_callback)(
MoqtRequestErrorInfo{RequestErrorCode::kInternalError, std::nullopt,
"Stream has been reset"});
return;
}
MoqtRequestUpdate message{next_request_id_, state_->request_id_, parameters};
pending_updates_[message.request_id] = std::move(response_callback);
state_->SendOrBufferMessageOrFatal(
state_->framer()->SerializeRequestUpdate(message));
next_request_id_ += 2;
}
GetNextResult MoqtNamespaceSubscriberStream::NamespaceTask::GetNextSuffix(
TrackNamespace& suffix, TransactionType& type) {
if (pending_suffixes_.empty()) {
if (error_.has_value()) {
return kError;
}
if (eof_) {
return kEof;
}
return kPending;
}
suffix = pending_suffixes_.front().suffix;
type = pending_suffixes_.front().type;
pending_suffixes_.pop_front();
return kSuccess;
}
void MoqtNamespaceSubscriberStream::NamespaceTask::AddPendingSuffix(
TrackNamespace suffix, TransactionType type) {
if (pending_suffixes_.size() == kMaxPendingSuffixes) {
error_ = kResetCodeTooFarBehind;
if (state_ != nullptr) {
state_->Reset(kResetCodeTooFarBehind);
}
return;
}
pending_suffixes_.push_back(PendingSuffix{std::move(suffix), type});
if (callback_ != nullptr) {
callback_();
}
}
void MoqtNamespaceSubscriberStream::NamespaceTask::DeclareEof() {
if (eof_) {
return;
}
eof_ = true;
state_ = nullptr;
if (callback_ != nullptr) {
callback_();
}
}
MoqtResponseCallback
MoqtNamespaceSubscriberStream::NamespaceTask::GetResponseCallback(
uint64_t request_id) {
auto it = pending_updates_.find(request_id);
if (it == pending_updates_.end()) {
return nullptr;
}
MoqtResponseCallback callback = std::move(it->second);
pending_updates_.erase(it);
return callback;
}
MoqtNamespacePublisherStream::MoqtNamespacePublisherStream(
MoqtFramer* framer, const MoqtControlMessageParser& message_parser,
SessionErrorCallback session_error_callback,
SessionNamespaceTree* absl_nonnull tree,
MoqtIncomingSubscribeNamespaceCallback& application)
// No stream_deleted_callback because there's no state yet.
: MoqtBidiStreamBase(
framer, message_parser, []() {}, std::move(session_error_callback)),
tree_(tree->GetWeakPtr()),
application_(application) {}
MoqtNamespacePublisherStream::~MoqtNamespacePublisherStream() {
if (task_ == nullptr) {
return;
}
SessionNamespaceTree* tree = tree_.GetIfAvailable();
if (tree != nullptr) {
// Could be null if the stream died early.
tree->UnsubscribeNamespace(task_->prefix());
}
}
absl::Status MoqtNamespacePublisherStream::OnRawControlMessage(
const MoqtRawControlMessage& message) {
return DispatchControlMessage<MoqtNamespacePublisherStream>(
message, "namespace publisher");
}
absl::Status MoqtNamespacePublisherStream::OnControlMessage(
const MoqtSubscribeNamespace& message) {
request_id_ = message.request_id;
SessionNamespaceTree* tree = tree_.GetIfAvailable();
if (tree == nullptr) {
return SendRequestError(request_id_, RequestErrorCode::kInternalError,
std::nullopt, "Session is gone", /*fin=*/true);
}
if (!tree->SubscribeNamespace(message.track_namespace_prefix)) {
return SendRequestError(request_id_, RequestErrorCode::kPrefixOverlap,
std::nullopt, "", /*fin=*/true);
}
QUICHE_DCHECK(task_ == nullptr);
task_ =
application_(message.track_namespace_prefix, message.subscribe_options,
message.parameters, ResponseCallback(request_id_));
if (task_ != nullptr) {
task_->SetObjectsAvailableCallback([this]() { ProcessNamespaces(); });
}
return absl::OkStatus();
}
absl::Status MoqtNamespacePublisherStream::OnControlMessage(
const MoqtRequestUpdate& message) {
if (task_ == nullptr) {
// This stream is dying.
return absl::OkStatus();
}
task_->Update(message.parameters, ResponseCallback(request_id_));
return absl::OkStatus();
}
void MoqtNamespacePublisherStream::ProcessNamespaces() {
if (task_ == nullptr) {
return;
}
TrackNamespace suffix;
TransactionType type;
while (!QueueIsFull()) {
GetNextResult result = task_->GetNextSuffix(suffix, type);
switch (result) {
case kPending:
return;
case kEof:
if (absl::Status status = webtransport::SendFinOnStream(*stream());
!status.ok()) {
OnFatalError(status);
};
return;
case kError:
Reset(kResetCodeCancelled);
return;
case kSuccess: {
absl::Status write_status;
switch (type) {
case TransactionType::kAdd: {
auto [it, inserted] = published_suffixes_.insert(suffix);
if (!inserted) {
// This should never happen. Do not send something that would
// cause a protocol violation.
return;
}
write_status = SendOrBufferMessage(
framer()->SerializeNamespace(MoqtNamespace{suffix}));
break;
}
case TransactionType::kDelete: {
if (published_suffixes_.erase(suffix) == 0) {
// This should never happen. Do not send something that would
// cause a protocol violation.
return;
}
write_status = SendOrBufferMessage(
framer()->SerializeNamespaceDone(MoqtNamespaceDone{suffix}));
break;
}
}
if (!write_status.ok()) {
if (absl::IsResourceExhausted(write_status)) {
// The peer is not reading data fast enough, and the sender has
// reached its buffer limit; reset the stream.
Reset(kResetCodeTooFarBehind);
return;
}
// All other write errors are fatal.
OnFatalError(write_status);
return;
}
break;
}
}
}
}
MoqtResponseCallback MoqtNamespacePublisherStream::ResponseCallback(
uint64_t request_id) {
return [this, request_id](std::optional<MoqtRequestErrorInfo> error) {
if (error.has_value()) {
CheckStatus(SendRequestError(request_id, *error, /*fin=*/true));
} else {
CheckStatus(SendRequestOk(request_id, MessageParameters()));
}
};
}
} // namespace moqt