blob: f07464339de5dc37856881141515e45e978d7513 [file]
// Copyright 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_uni_stream.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "absl/base/nullability.h"
#include "absl/memory/memory.h"
#include "absl/status/status.h"
#include "absl/types/span.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_utils.h"
#include "quiche/quic/moqt/moqt_error.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/stream_helpers.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
void OutgoingUniStream::UpdatePriority(MoqtPriority subscriber_priority) {
priority_.send_order = UpdateSendOrderForSubscriberPriority(
priority_.send_order, subscriber_priority);
stream_.SetPriority(priority_);
}
bool OutgoingUniStream::WriteObjectToStream(PublishedObject& object,
MoqtDataStreamType type) {
MoqtObject header;
header.track_alias = track_identifier_;
header.group_id = object.metadata.location.group;
header.subgroup_id = object.metadata.subgroup;
header.object_id = object.metadata.location.object;
header.publisher_priority = object.metadata.publisher_priority;
header.extension_headers = object.metadata.extensions;
header.object_status = object.metadata.status;
header.payload_length = object.metadata.payload_length;
quiche::QuicheBuffer serialized_header =
framer_.SerializeObjectHeader(header, type, last_object_);
std::vector<quiche::QuicheMemSlice> write_vector;
write_vector.reserve(object.payload.size() + 1);
write_vector.push_back(quiche::QuicheMemSlice(std::move(serialized_header)));
for (auto& slice : object.payload) {
write_vector.push_back(std::move(slice));
}
webtransport::StreamWriteOptions options;
options.set_send_fin(!type.IsFetch() && object.fin_after_this);
absl::Status write_status =
stream_.Writev(absl::MakeSpan(write_vector), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite being true "
"before; status: "
<< write_status;
return false;
}
QUIC_DVLOG(1) << "Stream " << stream_.GetStreamId() << " successfully wrote "
<< object.metadata.location
<< ", fin = " << object.fin_after_this;
return true;
}
OutgoingSubgroupStream::OutgoingSubgroupStream(
MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
DataStreamIndex index, uint64_t first_object,
quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor,
std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher,
webtransport::StreamPriority priority, uint64_t track_alias,
MoqtTraceRecorder* absl_nonnull trace_recorder)
: OutgoingUniStream(framer, stream, priority, track_alias),
index_(index),
visitor_(std::move(visitor)),
track_alias_(track_alias),
publisher_(track_publisher),
next_object_(first_object) {
trace_recorder->RecordSubgroupStreamCreated(stream->GetStreamId(),
track_alias_, index);
}
OutgoingSubgroupStream::~OutgoingSubgroupStream() {
// Though it might seem intuitive that the session object has to outlive the
// connection object (and this is indeed how something like QuicSession and
// QuicStream works), this is not the true for WebTransport visitors: the
// session getting destroyed will inevitably lead to all related streams being
// destroyed, but the actual order of destruction is not guaranteed. Thus, we
// need to check if the session still exists while accessing it in a stream
// destructor.
if (delivery_timeout_alarm_ != nullptr) {
delivery_timeout_alarm_->PermanentCancel();
}
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnDataStreamDestroyed(index_);
}
}
void OutgoingSubgroupStream::OnCanWrite() { SendObjects(); }
void OutgoingSubgroupStream::OnStopSendingReceived(
webtransport::StreamErrorCode error_code) {
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnSubgroupAbandoned(index_.group, index_.subgroup, error_code);
}
}
void OutgoingSubgroupStream::DeliveryTimeoutDelegate::OnAlarm() {
SubscriptionPublisherInterface* visitor = stream_->visitor_.GetIfAvailable();
if (visitor != nullptr) {
visitor->OnStreamTimeout(stream_->index_);
}
stream_->stream().ResetWithUserCode(kResetCodeDeliveryTimeout);
}
void OutgoingSubgroupStream::SendObjects() {
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
while (stream().CanWrite()) {
std::optional<PublishedObject> object = publisher_->GetCachedObject(
index_.group, index_.subgroup, next_object_, already_delivered_);
if (!object.has_value()) {
break;
}
if (object->metadata.payload_length > 0 && object->payload.empty()) {
QUICHE_BUG(OutgoingSubgroupStream_empty_payload)
<< "Received non-empty object with no payload";
return;
}
QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
if (!visitor->InWindow(object->metadata.location)) {
// It is possible that the next object became irrelevant due to a
// REQUEST_UPDATE. Close the stream if so.
absl::Status status = webtransport::SendFinOnStream(stream());
QUICHE_BUG_IF(OutgoingSubgroupStream_fin_due_to_update, !status.ok())
<< "Writing FIN failed despite CanWrite() being true.";
return;
}
quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
if (!visitor->alternate_delivery_timeout() &&
visitor->clock()->ApproximateNow() - object->metadata.arrival_time >
delivery_timeout) {
visitor->OnStreamTimeout(index_);
stream().ResetWithUserCode(kResetCodeDeliveryTimeout);
// No class access below this line.
return;
}
// Always include extension header length, because it's difficult to know
// a priori if they're going to appear on a stream.
if (!last_object().has_value()) {
type_ = MoqtDataStreamType::Subgroup(
index_.subgroup, next_object_, false,
object->metadata.publisher_priority ==
publisher_->extensions().default_publisher_priority());
}
uint64_t start_offset = already_delivered_;
already_delivered_ +=
quic::MemSliceSpanTotalSize(absl::MakeSpan(object->payload));
object->fin_after_this &=
already_delivered_ == object->metadata.payload_length;
if (start_offset > 0) { // Just send payload.
if (already_delivered_ == start_offset) {
// Partial delivery of an object but the payload is empty. This would
// result in an infinite loop.
QUICHE_BUG(OutgoingDataStream_empty_payload)
<< "Empty payload for partial object " << object->metadata.location;
return;
}
webtransport::StreamWriteOptions options;
options.set_send_fin(object->fin_after_this);
absl::Status write_status =
stream().Writev(absl::MakeSpan(object->payload), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite() being true "
"before; status: "
<< write_status;
stream().ResetWithUserCode(kResetCodeInternalError);
return;
}
} else {
if (!WriteObjectToStream(*object, type_)) {
stream().ResetWithUserCode(kResetCodeInternalError);
// No class access below this line.
return;
}
set_last_object(object->metadata);
next_object_ = object->metadata.location.object;
visitor->OnObjectSent(object->metadata.location);
}
QUICHE_DCHECK(last_object().has_value());
if (already_delivered_ != last_object()->payload_length) {
return;
}
++next_object_;
already_delivered_ = 0;
if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
!visitor->alternate_delivery_timeout()) {
CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
}
}
}
void OutgoingSubgroupStream::Fin(Location last_object) {
QUICHE_DCHECK_EQ(last_object.group, index_.group);
if (next_object_ <= last_object.object) {
// There is still data to send, do nothing.
return;
}
// All data has already been sent; send a pure FIN.
absl::Status status = webtransport::SendFinOnStream(stream());
QUICHE_BUG_IF(OutgoingSubgroupStream_fin_failed, !status.ok())
<< "Writing pure FIN failed.";
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
quic::QuicTimeDelta delivery_timeout = visitor->delivery_timeout();
if (!delivery_timeout.IsInfinite()) {
CreateAndSetAlarm(visitor->clock()->ApproximateNow() + delivery_timeout);
}
}
void OutgoingSubgroupStream::CreateAndSetAlarm(quic::QuicTime deadline) {
if (delivery_timeout_alarm_ != nullptr) {
return;
}
SubscriptionPublisherInterface* visitor = visitor_.GetIfAvailable();
if (visitor == nullptr) {
return;
}
delivery_timeout_alarm_ = absl::WrapUnique(
visitor->alarm_factory()->CreateAlarm(new DeliveryTimeoutDelegate(this)));
delivery_timeout_alarm_->Set(deadline);
}
OutgoingFetchStream::OutgoingFetchStream(
MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
uint64_t request_id, webtransport::StreamPriority priority,
std::unique_ptr<MoqtFetchTask> incoming_objects,
FetchStreamCloseCallback close_callback,
MoqtTraceRecorder* absl_nonnull trace_recorder)
: OutgoingUniStream(framer, stream, priority, request_id),
incoming_objects_(std::move(incoming_objects)),
close_callback_(std::move(close_callback)) {
incoming_objects_->SetObjectAvailableCallback(
[this]() { this->OnCanWrite(); });
trace_recorder->RecordFetchStreamCreated(stream->GetStreamId());
}
OutgoingFetchStream::~OutgoingFetchStream() {
if (close_callback_ != nullptr) {
std::move(close_callback_)();
}
close_callback_ = nullptr;
}
void OutgoingFetchStream::OnCanWrite() {
PublishedObject object;
while (stream().CanWrite()) {
MoqtFetchTask::GetNextObjectResult result =
incoming_objects_->GetNextObject(object);
switch (result) {
case MoqtFetchTask::GetNextObjectResult::kSuccess:
// Skip ObjectDoesNotExist in FETCH.
if (object.metadata.status != MoqtObjectStatus::kNormal) {
QUICHE_BUG(quiche_bug_got_doesnotexist_in_fetch)
<< "Got Non-normal object in FETCH";
continue;
}
if (last_object().has_value() &&
object.metadata.location == last_object()->location) {
// This is the continuation of the previous object.
webtransport::StreamWriteOptions options;
absl::Status write_status =
stream().Writev(absl::MakeSpan(object.payload), options);
if (!write_status.ok()) {
QUICHE_BUG(MoqtSession_WriteObjectToStream_write_failed)
<< "Writing into MoQT stream failed despite CanWrite() being "
"true before; status: "
<< write_status;
stream().ResetWithUserCode(kResetCodeInternalError);
return;
}
break;
}
if (WriteObjectToStream(object, MoqtDataStreamType::Fetch())) {
set_last_object(object.metadata);
}
break;
case MoqtFetchTask::GetNextObjectResult::kPending:
return;
case MoqtFetchTask::GetNextObjectResult::kEof:
// TODO(martinduke): Either prefetch the next object, or alter the API
// so that we're not sending FIN in a separate frame.
if (!webtransport::SendFinOnStream(stream()).ok()) {
QUICHE_DVLOG(1) << "Sending FIN onStream " << stream().GetStreamId()
<< " failed";
}
return;
case MoqtFetchTask::GetNextObjectResult::kError:
stream().ResetWithUserCode(static_cast<webtransport::StreamErrorCode>(
incoming_objects_->GetStatus().code()));
return;
}
}
}
void OutgoingFetchStream::OnStopSendingReceived(
webtransport::StreamErrorCode error_code) {
stream().ResetWithUserCode(error_code);
}
} // namespace moqt