blob: fd0d7d853c803f443c88762bf651fcb796a49503 [file]
// Copyright 2026 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
#define QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "absl/base/nullability.h"
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_alarm_factory.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_trace_recorder.h"
#include "quiche/quic/moqt/moqt_types.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace test {
class OutgoingSubgroupStreamPeer;
}
// A base class for locally initiated unidirectional streams, which can serve
// either a Subgroup or a FETCH response. It contains most of the machinery for
// managing the WebTransport stream.
class OutgoingUniStream : public webtransport::StreamVisitor {
public:
OutgoingUniStream(MoqtFramer framer,
webtransport::Stream* absl_nonnull stream,
webtransport::StreamPriority priority,
uint64_t track_identifier)
: stream_(*stream),
priority_(priority),
track_identifier_(track_identifier),
framer_(framer) {
stream_.SetPriority(priority_);
}
virtual ~OutgoingUniStream() = default;
// webtransport::StreamVisitor implementation.
void OnCanRead() override {} // Write-only.
// OnCanWrite() deferred to children.
virtual void OnResetStreamReceived(webtransport::StreamErrorCode) override {}
// OnStopSendingReceived() deferred to children.
void OnWriteSideInDataRecvdState() override {}
// Recomputes the send order and updates it for the associated stream.
void UpdatePriority(MoqtPriority subscriber_priority);
protected:
webtransport::Stream& stream() { return stream_; }
std::optional<PublishedObjectMetadata>& last_object() { return last_object_; }
void set_last_object(PublishedObjectMetadata metadata) {
last_object_ = std::move(metadata);
}
// Writes an object to the stream. Returns false if the write failed. The
// caller should reset the stream if that happens.
bool WriteObjectToStream(PublishedObject& object, MoqtDataStreamType type);
private:
webtransport::Stream& stream_; // Always valid because it owns this object.
webtransport::StreamPriority priority_;
uint64_t track_identifier_; // track alias or fetch request ID.
MoqtFramer framer_;
// Used to compute the object ID diff and pass metadata for partial objects.
// If nullopt, the stream header has not been written yet.
std::optional<PublishedObjectMetadata> last_object_;
};
// This interface provides information about the subscription.
class SubscriptionPublisherInterface {
public:
virtual ~SubscriptionPublisherInterface() = default;
virtual bool InWindow(Location) = 0;
virtual bool alternate_delivery_timeout() = 0;
virtual const quic::QuicClock* clock() = 0;
virtual quic::QuicTimeDelta delivery_timeout() = 0;
virtual quic::QuicAlarmFactory* alarm_factory() = 0;
// Called when the first byte of an object is written to the stream.
virtual void OnObjectSent(Location) = 0;
virtual void OnStreamTimeout(DataStreamIndex) = 0;
virtual void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup,
webtransport::StreamErrorCode) = 0;
virtual void OnDataStreamDestroyed(DataStreamIndex) = 0;
};
// This is for subscriptions only. FETCH uses its own construct.
class QUICHE_EXPORT OutgoingSubgroupStream : public OutgoingUniStream {
public:
// |visitor| is owned by the subscription, so the WeakPtr also serves as a
// liveness token.
OutgoingSubgroupStream(
MoqtFramer framer, webtransport::Stream* absl_nonnull stream,
DataStreamIndex index, uint64_t first_object,
quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor,
std::shared_ptr<MoqtTrackPublisher> absl_nonnull track_publisher,
webtransport::StreamPriority priority, uint64_t track_alias,
MoqtTraceRecorder* absl_nonnull trace_recorder);
~OutgoingSubgroupStream();
// webtransport::StreamVisitor overrides.
void OnCanWrite() override;
void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override;
class DeliveryTimeoutDelegate
: public quic::QuicAlarm::DelegateWithoutContext {
public:
explicit DeliveryTimeoutDelegate(OutgoingSubgroupStream* stream)
: stream_(stream) {}
void OnAlarm() override;
private:
OutgoingSubgroupStream* stream_;
};
// Sends a pure FIN on the stream, if the last object sent matches
// |last_object|. Otherwise, does nothing.
void Fin(Location last_object);
// Reset can be called directly on the stream, with no need to involve the
// visitor.
// Creates and sets an alarm for the given deadline. Does nothing if the
// alarm is already created.
void CreateAndSetAlarm(quic::QuicTime deadline);
private:
friend class DeliveryTimeoutDelegate;
friend class test::OutgoingSubgroupStreamPeer;
// Sends objects on the stream, starting with `next_object_`, until the
// stream becomes write-blocked or closed. Can reset the stream, destroying
// the class, on a write error.
void SendObjects();
DataStreamIndex index_;
quiche::QuicheWeakPtr<SubscriptionPublisherInterface> visitor_;
MoqtDataStreamType type_;
uint64_t track_alias_;
std::shared_ptr<MoqtTrackPublisher> publisher_;
// Minimum object ID that should go out next. The session doesn't know the
// exact ID of the next object in the stream because the next object could
// be in a different subgroup or simply be skipped.
uint64_t next_object_;
// Number of payload bytes from next_object_ that has already been written
// to the stream.
uint64_t already_delivered_ = 0;
// If this data stream is for SUBSCRIBE, reset it if an object has been
// excessively delayed per Section 7.1.1.2.
std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_;
};
using FetchStreamCloseCallback = quiche::SingleUseCallback<void()>;
class QUICHE_EXPORT OutgoingFetchStream : public OutgoingUniStream {
public:
OutgoingFetchStream(MoqtFramer framer,
webtransport::Stream* absl_nonnull stream,
uint64_t request_id,
webtransport::StreamPriority priority,
std::unique_ptr<MoqtFetchTask> incoming_objects,
FetchStreamCloseCallback close_callback,
MoqtTraceRecorder* absl_nonnull trace_recorder);
~OutgoingFetchStream();
// webtransport::StreamVisitor implementation.
void OnCanWrite() override;
void OnStopSendingReceived(webtransport::StreamErrorCode error_code) override;
private:
std::unique_ptr<MoqtFetchTask> incoming_objects_;
FetchStreamCloseCallback close_callback_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_UNI_STREAM_H_