blob: ce67b697d9a57e2ada236ab466433e0d32726e00 [file] [edit]
// Copyright 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_uni_stream.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "absl/base/casts.h"
#include "absl/base/nullability.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_utils.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/stream_helpers.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
void OutgoingUniStream::UpdatePriority(MoqtPriority subscriber_priority) {
priority_.send_order = UpdateSendOrderForSubscriberPriority(
priority_.send_order, subscriber_priority);
stream_.SetPriority(priority_);
}
bool OutgoingUniStream::WriteObjectToStream(PublishedObject& object,
MoqtDataStreamType type) {
MoqtObject header;
header.track_alias = track_identifier_;
header.group_id = object.metadata.location.group;
header.subgroup_id = object.metadata.subgroup;
header.object_id = object.metadata.location.object;
header.publisher_priority = object.metadata.publisher_priority;
header.extension_headers = object.metadata.extensions;
header.object_status = object.metadata.status;
header.payload_length = object.metadata.payload_length;
quiche::QuicheBuffer serialized_header =
framer_.SerializeObjectHeader(header, type, last_object_);
std::vector<quiche::QuicheMemSlice> write_vector;
write_vector.reserve(object.payload.size() + 1);
write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header)));
for (auto& slice : object.payload) {
write_vector.push_back(std::move(slice));
}
webtransport::StreamWriteOptions options;
options.set_send_fin(!type.IsFetch() && object.fin_after_this);
absl::Status write_status =
stream_.Writev(absl::MakeSpan(write_vector), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite being true "
"before; status: "
<< write_status;
return false;
}
QUIC_DVLOG(1) << "Stream " << stream_.GetStreamId() << " successfully wrote "
<< object.metadata.location
<< ", fin = " << object.fin_after_this;
return true;
}
OutgoingSubgroupStream::OutgoingSubgroupStream(
MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
DataStreamIndex index, uint64_t first_object,
quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor,
std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher,
webtransport::StreamPriority priority, uint64_t track_alias,
MoqtTraceRecorder* absl_nonnull trace_recorder)
: OutgoingUniStream(framer, stream, priority, track_alias),
index_(index),
visitor_(std::move(visitor)),
track_alias_(track_alias),
publisher_(track_publisher),
next_object_(first_object) {
trace_recorder->RecordSubgroupStreamCreated(stream->GetStreamId(),
track_alias_, index);
}
OutgoingSubgroupStream::~OutgoingSubgroupStream() {
// Though it might seem intuitive that the session object has to outlive the
// connection object (and this is indeed how something like QuicSession and
// QuicStream works), this is not the true for WebTransport visitors: the
// session getting destroyed will inevitably lead to all related streams being
// destroyed, but the actual order of destruction is not guaranteed. Thus, we
// need to check if the session still exists while accessing it in a stream
// destructor.
if (delivery_timeout_alarm_ != nullptr) {
delivery_timeout_alarm_->PermanentCancel();
}
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnDataStreamDestroyed(index_);
}
}
void OutgoingSubgroupStream::OnCanWrite() { SendObjects(); }
void OutgoingSubgroupStream::OnStopSendingReceived(
webtransport::StreamErrorCode error_code) {
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code);
}
}
void OutgoingSubgroupStream::DeliveryTimeoutDelegate::OnAlarm() {
SubscriptionPublisherInterface* visitor = stream_->visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnStreamTimeout(stream_->index_);
}
stream_->stream().ResetWithUserCode(kResetCodeDeliveryTimeout);
}
void OutgoingSubgroupStream::SendObjects() {
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
while (stream().CanWrite()) {
std::optional<PublishedObject> object = publisher_->GetCachedObject(
index_.group, index_.subgroup, next_object_, already_delivered_);
if (!object.has_value()) {
break;
}
if (object->metadata.payload_length > 0 && object->payload.empty()) {
QUICHE_BUG(OutgoingSubgroupStream_empty_payload)
<< "Received non-empty object with no payload";
return;
}
QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
if (!visitor->InWindow(object->metadata.location)) {
// It is possible that the next object became irrelevant due to a
// REQUEST_UPDATE. Close the stream if so.
absl::Status status = webtransport::SendFinOnStream(stream());
QUICHE_BUG_IF(OutgoingSubgroupStream_fin_due_to_update, !status.ok())
<< "Writing FIN failed despite CanWrite() being true.";
return;
}
quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
if (!visitor->alternate_delivery_timeout() &&
visitor->clock()->ApproximateNow() - object->metadata.arrival_time >
delivery_timeout) {
visitor->OnStreamTimeout(index_);
stream().ResetWithUserCode(kResetCodeDeliveryTimeout);
// No class access below this line.
return;
}
// Always include extension header length, because it's difficult to know
// a priori if they're going to appear on a stream.
if (!last_object().has_value()) {
type_ = MoqtDataStreamType::Subgroup(
index_.subgroup, next_object_, false,
object->metadata.publisher_priority ==
publisher_->extensions().default_publisher_priority());
}
uint64_t start_offset = already_delivered_;
already_delivered_ +=
quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload));
object->fin_after_this &=
already_delivered_ == object->metadata.payload_length;
if (start_offset > 0) { // Just send payload.
if (already_delivered_ == start_offset) {
// Partial delivery of an object but the payload is empty. This would
// result in an infinite loop.
QUICHE_BUG(OutgoingDataStream_empty_payload)
<< "Empty payload for partial object " << object->metadata.location;
return;
}
webtransport::StreamWriteOptions options;
options.set_send_fin(object->fin_after_this);
absl::Status write_status =
stream().Writev(absl::MakeSpan(object->payload), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite() being true "
"before; status: "
<< write_status;
stream().ResetWithUserCode(kResetCodeInternalError);
return;
}
} else {
if (!WriteObjectToStream(*object, type_)) {
stream().ResetWithUserCode(kResetCodeInternalError);
// No class access below this line.
return;
}
set_last_object(object->metadata);
next_object_ = object->metadata.location.object;
visitor->OnObjectSent(object->metadata.location);
}
QUICHE_DCHECK(last_object().has_value());
if (already_delivered_ != last_object()->payload_length) {
return;
}
++next_object_;
already_delivered_ = 0;
if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
!visitor->alternate_delivery_timeout()) {
CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
}
}
}
void OutgoingSubgroupStream::Fin(Location last_object) {
QUICHE_DCHECK_EQ(last_object.group, index_.group);
if (next_object_ <= last_object.object) {
// There is still data to send, do nothing.
return;
}
// All data has already been sent; send a pure FIN.
absl::Status status = webtransport::SendFinOnStream(stream());
QUICHE_BUG_IF(OutgoingSubgroupStream_fin_failed, !status.ok())
<< "Writing pure FIN failed.";
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
if (!delivery_timeout.IsInfinite()) {
CreateAndSetAlarm(visitor->clock()->ApproximateNow() + delivery_timeout);
}
}
void OutgoingSubgroupStream::CreateAndSetAlarm(quic::QuicTime deadline) {
if (delivery_timeout_alarm_ != nullptr) {
return;
}
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
delivery_timeout_alarm_ = absl::WrapUnique(
visitor->alarm_factory()->CreateAlarm(new DeliveryTimeoutDelegate(this)));
delivery_timeout_alarm_->Set(deadline);
}
OutgoingFetchStream::OutgoingFetchStream(
MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
uint64_t request_id, webtransport::StreamPriority priority,
std::unique_ptr<MoqtFetchTask> incoming_objects,
FetchStreamCloseCallback close_callback,
MoqtTraceRecorder* absl_nonnull trace_recorder)
: OutgoingUniStream(framer, stream, priority, request_id),
incoming_objects_(std::move(incoming_objects)),
close_callback_(std::move(close_callback)) {
incoming_objects_->SetObjectAvailableCallback(
[this]() { this->OnCanWrite(); });
trace_recorder->RecordFetchStreamCreated(stream->GetStreamId());
}
OutgoingFetchStream::~OutgoingFetchStream() {
if (close_callback_ != nullptr) {
std::move(close_callback_)();
}
close_callback_ = nullptr;
}
void OutgoingFetchStream::OnCanWrite() {
PublishedObject object;
while (stream().CanWrite()) {
MoqtFetchTask::GetNextObjectResult result =
incoming_objects_->GetNextObject(object);
switch (result) {
case MoqtFetchTask::GetNextObjectResult::kSuccess:
// Skip ObjectDoesNotExist in FETCH.
if (object.metadata.status != MoqtObjectStatus::kNormal) {
QUICHE_BUG(quiche_bug_got_doesnotexist_in_fetch)
<< "Got Non-normal object in FETCH";
continue;
}
if (last_object().has_value() &&
object.metadata.location == last_object()->location) {
// This is the continuation of the previous object.
webtransport::StreamWriteOptions options;
absl::Status write_status =
stream().Writev(absl::MakeSpan(object.payload), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite() being "
"true before; status: "
<< write_status;
stream().ResetWithUserCode(kResetCodeInternalError);
return;
}
break;
}
if (WriteObjectToStream(object, MoqtDataStreamType::Fetch())) {
set_last_object(object.metadata);
}
break;
case MoqtFetchTask::GetNextObjectResult::kPending:
return;
case MoqtFetchTask::GetNextObjectResult::kEof:
// TODO(martinduke): Either prefetch the next object, or alter the API
// so that we're not sending FIN in a separate frame.
if (!webtransport::SendFinOnStream(stream()).ok()) {
QUICHE_DVLOG(1) << "Sending FIN onStream " << stream().GetStreamId()
<< " failed";
}
return;
case MoqtFetchTask::GetNextObjectResult::kError:
stream().ResetWithUserCode(static_cast<webtransport::StreamErrorCode>(
incoming_objects_->GetStatus().code()));
return;
}
}
}
void OutgoingFetchStream::OnStopSendingReceived(
webtransport::StreamErrorCode error_code) {
stream().ResetWithUserCode(error_code);
}
IncomingDataStream::~IncomingDataStream() {
QUICHE_DVLOG(1) << "Destroying incoming data stream "
<< stream_->GetStreamId();
if (!parser_.track_alias().has_value()) {
QUIC_DVLOG(1) << "Destroying incoming data stream before "
"learning track alias";
return;
}
if (!track_.IsValid()) {
return;
}
if (IsFetch()) {
auto fetch = absl::down_cast<UpstreamFetch*>(track_.GetIfAvailable());
if (fetch != nullptr) {
fetch->OnStreamClosed();
}
return;
}
// It's a subscribe.
auto subscribe =
absl::down_cast<SubscribeRemoteTrack*>(track_.GetIfAvailable());
if (subscribe == nullptr) {
return;
}
subscribe->OnStreamClosed(fin_received_, index_);
}
void IncomingDataStream::OnObjectMessage(const MoqtObject& message,
absl::string_view payload,
bool end_of_message) {
QUICHE_DVLOG(1) << "Received OBJECT message on stream "
<< stream_->GetStreamId() << " for track alias "
<< message.track_alias << " with sequence "
<< message.group_id << ":" << message.object_id
<< " priority " << message.publisher_priority << " length "
<< payload.size() << " length " << message.payload_length
<< (end_of_message ? "F" : "");
if (!session_->deliver_partial_objects()) {
if (!end_of_message) { // Buffer partial object.
if (partial_object_.empty()) {
// Avoid redundant allocations by reserving the appropriate amount of
// memory if known.
partial_object_.reserve(message.payload_length);
}
absl::StrAppend(&partial_object_, payload);
return;
}
if (!partial_object_.empty()) { // Completes the object
absl::StrAppend(&partial_object_, payload);
payload = absl::string_view(partial_object_);
}
}
if (payload.empty() && bytes_received_this_object_ > 0 && !end_of_message) {
return; // Nothing arrived.
}
if (!parser_.track_alias().has_value()) {
QUICHE_BUG(quic_bug_object_with_no_stream_type)
<< "Object delivered without preliminaries";
return;
}
// Get a pointer to the upstream state.
if (!track_.IsValid()) {
track_ = IsFetch() ? session_->GetFetch(message.track_alias)
: session_->GetSubscribe(message.track_alias);
}
if (!track_.IsValid()) {
// The request has gone away.
stream_->SendStopSending(kResetCodeCancelled);
return;
}
Location location(message.group_id, message.object_id);
RemoteTrack* track = track_.GetIfAvailable();
if (track == nullptr ||
!track->InWindow(Location(message.group_id, message.object_id))) {
// This is not an error. It can be the result of a recent REQUEST_UPDATE or
// UNSUBSCRIBE.
return;
}
if (!IsFetch()) {
if (!index_.has_value()) {
if (!message.subgroup_id.has_value()) {
QUICHE_BUG(quiche_bug_moqt_subgroup_id_missing)
<< "Missing subgroup ID on SUBSCRIBE stream";
return;
}
index_ = DataStreamIndex(message.group_id, *message.subgroup_id);
}
if (no_more_objects_) {
// Already got a stream-ending object. While the lower layer won't
// deliver data after the FIN, there could have been an EndOfGroup or
// EndOfTrack signal.
session_->OnMalformedTrack(track);
return;
}
if (end_of_message) {
next_object_id_ = message.object_id + 1;
if (message.object_status == MoqtObjectStatus::kEndOfTrack ||
message.object_status == MoqtObjectStatus::kEndOfGroup) {
no_more_objects_ = true;
}
}
SubscribeRemoteTrack* subscribe =
absl::down_cast<SubscribeRemoteTrack*>(track);
subscribe->OnObjectOrOk();
if (visitor_ != nullptr) {
PublishedObjectMetadata metadata;
metadata.location = Location(message.group_id, message.object_id);
metadata.subgroup = message.subgroup_id;
metadata.extensions = message.extension_headers;
metadata.status = message.object_status;
metadata.publisher_priority = message.publisher_priority;
metadata.payload_length = message.payload_length;
metadata.arrival_time = clock_->Now();
visitor_->OnObjectFragment(track->full_track_name(), metadata, payload,
bytes_received_this_object_);
}
} else { // FETCH
track->OnObjectOrOk();
UpstreamFetch* fetch = absl::down_cast<UpstreamFetch*>(track);
if (!fetch->LocationIsValid(Location(message.group_id, message.object_id),
message.object_status, end_of_message)) {
// TODO(martinduke): in https://github.com/moq-wg/moq-transport/pull/1409
// I make the case that this should be a protocol violation. Update if
// that proposal is accepted (at which point
// QuicSession::OnMalformedTrack can be removed, since all the
// remaining conditions are at the application layer).
session_->OnMalformedTrack(track);
return;
}
UpstreamFetch::UpstreamFetchTask* task = fetch->task();
if (task == nullptr) {
// The application killed the FETCH.
stream_->SendStopSending(kResetCodeCancelled);
return;
}
if (!task->HasObject()) {
task->NewObject(message);
}
if (task->NeedsMorePayload() && !payload.empty()) {
task->AppendPayloadToObject(payload);
}
}
if (end_of_message) {
bytes_received_this_object_ = 0;
} else {
bytes_received_this_object_ += payload.size();
}
partial_object_.clear();
}
void IncomingDataStream::MaybeReadOneObject() {
if (!parser_.track_alias().has_value() ||
!parser_.stream_type().has_value() || !parser_.stream_type()->IsFetch()) {
QUICHE_BUG(quic_bug_read_one_object_parser_unexpected_state)
<< "Requesting object, parser in unexpected state";
}
if (!track_.IsValid()) {
return;
}
UpstreamFetch* fetch =
absl::down_cast<UpstreamFetch*>(track_.GetIfAvailable());
UpstreamFetch::UpstreamFetchTask* task = fetch->task();
if (task == nullptr) {
return;
}
if (task->HasObject() && !task->NeedsMorePayload()) {
return; // The message is complete. Do not read more.
}
uint64_t start_length = task->payload_length();
parser_.ReadAtMostOneObject();
// If it read an object, it called OnObjectMessage and may have altered the
// task's object state.
if (task->payload_length() > start_length) {
task->NotifyNewObject();
}
}
void IncomingDataStream::OnCanRead() {
if (!parser_.stream_type().has_value()) {
parser_.ReadStreamType();
if (!parser_.stream_type().has_value()) {
return;
}
}
if (parser_.stream_type()->IsPadding()) {
(void)stream_->SkipBytes(stream_->ReadableBytes());
return;
}
bool knew_track_alias = parser_.track_alias().has_value();
if (!knew_track_alias) {
parser_.ReadTrackAlias();
if (!parser_.track_alias().has_value()) {
return;
}
}
QUICHE_CHECK(parser_.stream_type().has_value());
QUICHE_CHECK(parser_.track_alias().has_value());
if (parser_.stream_type()->IsSubgroup()) {
if (!knew_track_alias) {
track_ = session_->GetSubscribe(*parser_.track_alias());
// This is a new stream for a subscribe. Notify the subscription.
SubscribeRemoteTrack* subscribe =
absl::down_cast<SubscribeRemoteTrack*>(track_.GetIfAvailable());
if (subscribe == nullptr) {
stream_->SendStopSending(kResetCodeCancelled);
return;
}
subscribe->OnStreamOpened();
parser_.set_default_publisher_priority(
subscribe->default_publisher_priority());
visitor_ = subscribe->visitor();
}
parser_.ReadAllData();
return;
}
// FETCH
if (!knew_track_alias) {
track_ = session_->GetFetch(*parser_.track_alias());
}
if (!track_.IsValid()) {
stream_->SendStopSending(kResetCodeCancelled);
return;
}
UpstreamFetch* fetch =
absl::down_cast<UpstreamFetch*>(track_.GetIfAvailable());
if (!knew_track_alias) {
// If the task already exists (FETCH_OK has arrived), the callback will
// immediately execute to read the first object. Otherwise, it will only
// execute when the task is created or a cached object is read.
fetch->OnStreamOpened([this]() { MaybeReadOneObject(); });
return;
}
MaybeReadOneObject();
}
void IncomingDataStream::OnParsingError(MoqtError error_code,
absl::string_view reason) {
session_->Error(error_code, absl::StrCat("Parse error: ", reason));
}
} // namespace moqt