blob: 1fe45cfe6efd7adadb19570d13f8b3fec2a52ab8 [file] [log] [blame]
// Copyright (c) 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_namespace_stream.h"
#include <memory>
#include <optional>
#include <utility>
#include "absl/base/nullability.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_bidi_stream.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_key_value_pair.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_names.h"
#include "quiche/quic/moqt/moqt_session_callbacks.h"
#include "quiche/quic/moqt/session_namespace_tree.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_stream.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
MoqtNamespaceSubscriberStream::~MoqtNamespaceSubscriberStream() {
NamespaceTask* task = task_.GetIfAvailable();
if (task != nullptr) {
task->DeclareEof();
}
}
void MoqtNamespaceSubscriberStream::set_stream(
webtransport::Stream* absl_nonnull stream) {
// TODO(martinduke): Set the priority for this stream.
MoqtBidiStreamBase::set_stream(stream);
}
void MoqtNamespaceSubscriberStream::OnRequestOkMessage(
const MoqtRequestOk& message) {
if (message.request_id != request_id_) {
OnParsingError(MoqtError::kProtocolViolation,
"Unexpected request ID in response");
return;
}
if (response_callback_ == nullptr) {
OnParsingError(MoqtError::kProtocolViolation, "Two responses");
return;
}
std::move(response_callback_)(std::nullopt);
response_callback_ = nullptr;
}
void MoqtNamespaceSubscriberStream::OnRequestErrorMessage(
const MoqtRequestError& message) {
if (message.request_id != request_id_) {
OnParsingError(MoqtError::kProtocolViolation,
"Unexpected request ID in error");
return;
}
if (response_callback_ == nullptr) {
OnParsingError(MoqtError::kProtocolViolation, "Two responses");
return;
}
std::move(response_callback_)(MoqtRequestErrorInfo{
message.error_code, message.retry_interval, message.reason_phrase});
response_callback_ = nullptr;
}
void MoqtNamespaceSubscriberStream::OnNamespaceMessage(
const MoqtNamespace& message) {
if (response_callback_ != nullptr) {
OnParsingError(MoqtError::kProtocolViolation,
"First message must be REQUEST_OK or REQUEST_ERROR");
return;
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
// The application has already unsubscribed, and the stream has been reset.
// This is irrelevant.
return;
}
if (task->prefix().number_of_elements() +
message.track_namespace_suffix.number_of_elements() >
kMaxNamespaceElements) {
OnParsingError(MoqtError::kProtocolViolation,
"Too many namespace elements");
return;
}
if (task->prefix().total_length() +
message.track_namespace_suffix.total_length() >
kMaxFullTrackNameSize) {
OnParsingError(MoqtError::kProtocolViolation, "Namespace too large");
return;
}
auto [it, inserted] =
published_suffixes_.insert(message.track_namespace_suffix);
if (!inserted) {
OnParsingError(MoqtError::kProtocolViolation,
"Two NAMESPACE messages for the same track namespace");
return;
}
task->AddPendingSuffix(message.track_namespace_suffix, TransactionType::kAdd);
}
void MoqtNamespaceSubscriberStream::OnNamespaceDoneMessage(
const MoqtNamespaceDone& message) {
if (response_callback_ != nullptr) {
OnParsingError(MoqtError::kProtocolViolation,
"First message must be REQUEST_OK or REQUEST_ERROR");
return;
}
NamespaceTask* task = task_.GetIfAvailable();
if (task == nullptr) {
return;
}
if (published_suffixes_.erase(message.track_namespace_suffix) == 0) {
OnParsingError(MoqtError::kProtocolViolation,
"NAMESPACE_DONE with no active namespace");
return;
}
task->AddPendingSuffix(message.track_namespace_suffix,
TransactionType::kDelete);
}
std::unique_ptr<MoqtNamespaceTask> MoqtNamespaceSubscriberStream::CreateTask(
const TrackNamespace& prefix,
ObjectsAvailableCallback absl_nonnull callback) {
auto task =
std::make_unique<NamespaceTask>(this, prefix, std::move(callback));
QUICHE_DCHECK(task != nullptr);
task_ = task->GetWeakPtr();
QUICHE_DCHECK(task_.IsValid());
return std::move(task);
}
MoqtNamespaceSubscriberStream::NamespaceTask::~NamespaceTask() {
if (state_ != nullptr) {
state_->Reset(kResetCodeCanceled);
}
}
GetNextResult MoqtNamespaceSubscriberStream::NamespaceTask::GetNextSuffix(
TrackNamespace& suffix, TransactionType& type) {
if (pending_suffixes_.empty()) {
if (error_.has_value()) {
return kError;
}
if (eof_) {
return kEof;
}
return kPending;
}
suffix = pending_suffixes_.front().suffix;
type = pending_suffixes_.front().type;
pending_suffixes_.pop_front();
return kSuccess;
}
void MoqtNamespaceSubscriberStream::NamespaceTask::AddPendingSuffix(
TrackNamespace suffix, TransactionType type) {
if (pending_suffixes_.size() == kMaxPendingSuffixes) {
error_ = kResetCodeTooFarBehind;
if (state_ != nullptr) {
state_->Reset(kResetCodeTooFarBehind);
}
return;
}
pending_suffixes_.push_back(PendingSuffix{std::move(suffix), type});
if (callback_ != nullptr) {
callback_();
}
}
void MoqtNamespaceSubscriberStream::NamespaceTask::DeclareEof() {
if (eof_) {
return;
}
eof_ = true;
state_ = nullptr;
if (callback_ != nullptr) {
callback_();
}
}
MoqtNamespacePublisherStream::MoqtNamespacePublisherStream(
MoqtFramer* framer, webtransport::Stream* stream,
SessionErrorCallback session_error_callback, SessionNamespaceTree& tree,
MoqtIncomingSubscribeNamespaceCallbackNew& application)
// No stream_deleted_callback because there's no state yet.
: MoqtBidiStreamBase(
framer, []() {}, std::move(session_error_callback)),
tree_(tree),
application_(application) {
// TODO(martinduke): Set the priority for this stream.
MoqtBidiStreamBase::set_stream(stream, MoqtMessageType::kSubscribeNamespace);
}
MoqtNamespacePublisherStream::~MoqtNamespacePublisherStream() {
if (task_ != nullptr) {
// Could be null if the stream died early.
tree_.UnsubscribeNamespace(task_->prefix());
}
}
void MoqtNamespacePublisherStream::OnSubscribeNamespaceMessage(
const MoqtSubscribeNamespace& message) {
request_id_ = message.request_id;
if (!tree_.SubscribeNamespace(message.track_namespace_prefix)) {
SendRequestError(request_id_, RequestErrorCode::kPrefixOverlap,
std::nullopt, "");
return;
}
QUICHE_DCHECK(task_ == nullptr);
task_ = application_(
message.track_namespace_prefix, message.parameters,
// Response callback
[this](std::optional<MoqtRequestErrorInfo> error) {
if (error.has_value()) {
SendRequestError(request_id_, *error, /*fin=*/true);
} else {
SendRequestOk(request_id_, MessageParameters());
}
},
// Objects available callback
[this]() { ProcessNamespaces(); });
}
void MoqtNamespacePublisherStream::ProcessNamespaces() {
if (task_ == nullptr) {
return;
}
TrackNamespace suffix;
TransactionType type;
while (!QueueIsFull()) {
GetNextResult result = task_->GetNextSuffix(suffix, type);
switch (result) {
case kPending:
return;
case kEof:
if (!SendFinOnStream(*stream()).ok()) {
OnParsingError(MoqtError::kInternalError, "Failed to send FIN");
};
return;
case kError:
Reset(kResetCodeCanceled);
return;
case kSuccess: {
switch (type) {
case TransactionType::kAdd: {
auto [it, inserted] = published_suffixes_.insert(suffix);
if (!inserted) {
// This should never happen. Do not send something that would
// cause a protocol violation.
return;
}
SendOrBufferMessage(
framer_->SerializeNamespace(MoqtNamespace{suffix}));
break;
}
case TransactionType::kDelete: {
if (published_suffixes_.erase(suffix) == 0) {
// This should never happen. Do not send something that would
// cause a protocol violation.
return;
}
SendOrBufferMessage(
framer_->SerializeNamespaceDone(MoqtNamespaceDone{suffix}));
break;
}
}
}
}
}
}
} // namespace moqt