blob: d9abd82454e6683ff187be9e24df333be20db5b6 [file] [log] [blame]
// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
#define QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_alarm.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_weak_ptr.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
namespace test {
class MoqtSessionPeer;
class SubscribeRemoteTrackPeer;
} // namespace test
using MoqtObjectAckFunction =
quiche::MultiUseCallback<void(uint64_t group_id, uint64_t object_id,
quic::QuicTimeDelta delta_from_deadline)>;
// State common to both SUBSCRIBE and FETCH upstream.
class RemoteTrack {
public:
RemoteTrack(const FullTrackName& full_track_name, uint64_t id,
SubscribeWindow window, MoqtPriority priority)
: full_track_name_(full_track_name),
request_id_(id),
subscriber_priority_(priority),
window_(window),
weak_ptr_factory_(this) {}
virtual ~RemoteTrack() = default;
FullTrackName full_track_name() const { return full_track_name_; }
// If FETCH_ERROR or SUBSCRIBE_ERROR arrives after OK or an object, it is a
// protocol violation.
virtual void OnObjectOrOk() { error_is_allowed_ = false; }
bool ErrorIsAllowed() const { return error_is_allowed_; }
// When called while processing the first object in the track, sets the
// data stream type to the value indicated by the incoming encoding.
// Otherwise, returns true if the incoming object does not violate the rule
// that the type is consistent.
bool CheckDataStreamType(MoqtDataStreamType type);
bool is_fetch() const {
return data_stream_type_.has_value() &&
*data_stream_type_ == MoqtDataStreamType::kStreamHeaderFetch;
}
uint64_t request_id() const { return request_id_; }
// Is the object one that was requested?
bool InWindow(Location sequence) const { return window_.InWindow(sequence); }
quiche::QuicheWeakPtr<RemoteTrack> weak_ptr() {
return weak_ptr_factory_.Create();
}
const SubscribeWindow& window() const { return window_; }
MoqtPriority subscriber_priority() const { return subscriber_priority_; }
void set_subscriber_priority(MoqtPriority priority) {
subscriber_priority_ = priority;
}
protected:
SubscribeWindow& window_mutable() { return window_; };
private:
const FullTrackName full_track_name_;
const uint64_t request_id_;
MoqtPriority subscriber_priority_;
SubscribeWindow window_;
std::optional<MoqtDataStreamType> data_stream_type_;
// If false, an object or OK message has been received, so any ERROR message
// is a protocol violation.
bool error_is_allowed_ = true;
// Must be last.
quiche::QuicheWeakPtrFactory<RemoteTrack> weak_ptr_factory_;
};
// A track on the peer to which the session has subscribed.
class SubscribeRemoteTrack : public RemoteTrack {
public:
// TODO: Separate this out (as it's used by the application) and give it a
// name like MoqtTrackSubscriber,
class Visitor {
public:
virtual ~Visitor() = default;
// Called when the session receives a response to the SUBSCRIBE, unless it's
// a SUBSCRIBE_ERROR with a new track_alias. In that case, the session will
// automatically retry.
virtual void OnReply(
const FullTrackName& full_track_name,
std::optional<Location> largest_location,
std::optional<absl::string_view> error_reason_phrase) = 0;
// Called when the subscription process is far enough that it is possible to
// send OBJECT_ACK messages; provides a callback to do so. The callback is
// valid for as long as the session is valid.
virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0;
// Called when an object fragment (or an entire object) is received.
virtual void OnObjectFragment(const FullTrackName& full_track_name,
Location sequence,
MoqtPriority publisher_priority,
MoqtObjectStatus object_status,
absl::string_view object,
bool end_of_message) = 0;
virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
};
SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
: RemoteTrack(subscribe.full_track_name, subscribe.request_id,
SubscribeWindow(subscribe.start.value_or(Location()),
subscribe.end_group),
subscribe.subscriber_priority),
track_alias_(subscribe.track_alias),
forward_(subscribe.forward),
visitor_(visitor),
delivery_timeout_(subscribe.parameters.delivery_timeout),
subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
~SubscribeRemoteTrack() override {
if (subscribe_done_alarm_ != nullptr) {
subscribe_done_alarm_->PermanentCancel();
}
}
void OnObjectOrOk() override {
subscribe_.reset(); // No SUBSCRIBE_ERROR, no need to store this anymore.
RemoteTrack::OnObjectOrOk();
}
uint64_t track_alias() const { return track_alias_; }
Visitor* visitor() { return visitor_; }
MoqtSubscribe& GetSubscribe() {
return *subscribe_;
// This class will soon be destroyed, so there's no need to null the
// unique_ptr;
}
// Returns false if the forwarding preference is changing on the track.
bool OnObject(bool is_datagram) {
OnObjectOrOk();
if (!is_datagram_.has_value()) {
is_datagram_ = is_datagram;
return true;
} else {
return (is_datagram_ == is_datagram);
}
}
// Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE.
bool TruncateStart(Location start) {
return window_mutable().TruncateStart(start);
}
// Called on SUBSCRIBE_UPDATE.
bool TruncateEnd(uint64_t end_group) {
return window_mutable().TruncateEnd(end_group);
}
void OnStreamOpened();
void OnStreamClosed();
void OnSubscribeDone(uint64_t stream_count, const quic::QuicClock* clock,
std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm);
bool all_streams_closed() const {
return total_streams_.has_value() && *total_streams_ == streams_closed_;
}
// The application can request a Joining FETCH but also for FETCH objects to
// be delivered via SubscribeRemoteTrack::Visitor::OnObjectFragment(). When
// this occurs, the session passes the FetchTask here to handle incoming
// FETCH objects to pipe directly into the visitor.
void OnJoiningFetchReady(std::unique_ptr<MoqtFetchTask> fetch_task);
bool forward() const { return forward_; }
void set_forward(bool forward) { forward_ = forward; }
private:
friend class test::MoqtSessionPeer;
friend class test::SubscribeRemoteTrackPeer;
void MaybeSetSubscribeDoneAlarm();
void FetchObjects();
std::unique_ptr<MoqtFetchTask> fetch_task_;
const uint64_t track_alias_;
bool forward_;
Visitor* visitor_;
std::optional<bool> is_datagram_;
int currently_open_streams_ = 0;
// Every stream that has received FIN or RESET_STREAM.
uint64_t streams_closed_ = 0;
// Value assigned on SUBSCRIBE_DONE. Can destroy subscription state if
// streams_closed_ == total_streams_.
std::optional<uint64_t> total_streams_;
// Timer to clean up the track if there are no open streams.
quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm_ = nullptr;
const quic::QuicClock* clock_ = nullptr;
// For convenience, store the subscribe message if it has to be re-sent with
// a new track alias.
std::unique_ptr<MoqtSubscribe> subscribe_;
};
// MoqtSession calls this when a FETCH_OK or FETCH_ERROR is received. The
// destination of the callback owns |fetch_task| and MoqtSession will react
// safely if the owner destroys it.
using FetchResponseCallback =
quiche::SingleUseCallback<void(std::unique_ptr<MoqtFetchTask> fetch_task)>;
// This is a callback to MoqtSession::IncomingDataStream. Called when the
// FetchTask has its object cache empty, on creation, and whenever the
// application reads it.
using CanReadCallback = quiche::MultiUseCallback<void()>;
// If the application destroys the FetchTask, this is a signal to MoqtSession to
// cancel the FETCH and STOP_SENDING the stream.
using TaskDestroyedCallback = quiche::SingleUseCallback<void()>;
// Class for upstream FETCH. It will notify the application using |callback|
// when a FETCH_OK or FETCH_ERROR is received.
class UpstreamFetch : public RemoteTrack {
public:
UpstreamFetch(const MoqtFetch& fetch, FetchResponseCallback callback)
: RemoteTrack(fetch.full_track_name, fetch.fetch_id,
fetch.joining_fetch.has_value()
? SubscribeWindow(Location(0, 0))
: SubscribeWindow(fetch.start_object, fetch.end_group,
fetch.end_object),
fetch.subscriber_priority),
ok_callback_(std::move(callback)) {
// Immediately set the data stream type.
CheckDataStreamType(MoqtDataStreamType::kStreamHeaderFetch);
}
UpstreamFetch(const UpstreamFetch&) = delete;
~UpstreamFetch();
class UpstreamFetchTask : public MoqtFetchTask {
public:
// If the UpstreamFetch is destroyed, it will call OnStreamAndFetchClosed
// which sets the TaskDestroyedCallback to nullptr. Thus, |callback| can
// assume that UpstreamFetch is valid.
UpstreamFetchTask(Location largest_location, absl::Status status,
TaskDestroyedCallback callback)
: largest_location_(largest_location),
status_(status),
task_destroyed_callback_(std::move(callback)),
weak_ptr_factory_(this) {}
~UpstreamFetchTask() override;
// Implementation of MoqtFetchTask.
GetNextObjectResult GetNextObject(PublishedObject& output) override;
void SetObjectAvailableCallback(
ObjectsAvailableCallback callback) override {
object_available_callback_ = std::move(callback);
};
// TODO(martinduke): Implement the new API, but for now, only deliver the
// FetchTask on FETCH_OK.
void SetFetchResponseCallback(FetchResponseCallback callback) override {}
absl::Status GetStatus() override { return status_; };
quiche::QuicheWeakPtr<UpstreamFetchTask> weak_ptr() {
return weak_ptr_factory_.Create();
}
// MoqtSession should not use this function; use
// UpstreamFetch::OnStreamOpened() instead, in case the task does not exist
// yet.
void set_can_read_callback(CanReadCallback callback) {
can_read_callback_ = std::move(callback);
can_read_callback_(); // Accept the first object.
}
// Called when the data stream receives a new object.
void NewObject(const MoqtObject& message);
void AppendPayloadToObject(absl::string_view payload);
// MoqtSession calls this for a hint if the object has been read.
bool HasObject() const { return next_object_.has_value(); }
bool NeedsMorePayload() const {
return next_object_.has_value() && next_object_->payload_length > 0;
}
// MoqtSession calls NotifyNewObject() after NewObject() because it has to
// exit the parser loop before the callback possibly causes another read.
// Furthermore, NewObject() may be a partial object, and so
// NotifyNewObject() is called only when the object is complete.
void NotifyNewObject();
// Deletes callbacks to session or stream, updates the status. If |error|
// has no value, will append an EOF to the object stream.
void OnStreamAndFetchClosed(
std::optional<webtransport::StreamErrorCode> error,
absl::string_view reason_phrase);
private:
Location largest_location_;
absl::Status status_;
TaskDestroyedCallback task_destroyed_callback_;
// Object delivery state. The payload_length member is used to track the
// payload bytes not yet received. The application receives a
// PublishedObject that is constructed from next_object_ and payload_.
std::optional<MoqtObject> next_object_;
// Store payload separately. Will be converted into QuicheMemSlice only when
// complete, since QuicheMemSlice is immutable.
quiche::QuicheBuffer payload_;
// The task should only call object_available_callback_ when the last result
// was kPending. Otherwise, there can be recursive loops of
// GetNextObjectResult().
bool need_object_available_callback_ = true;
bool eof_ = false; // The next object is EOF.
// The Fetch task signals the application when it has new objects.
ObjectsAvailableCallback object_available_callback_;
// The Fetch task signals the stream when it has dispensed of an object.
CanReadCallback can_read_callback_;
// Must be last.
quiche::QuicheWeakPtrFactory<UpstreamFetchTask> weak_ptr_factory_;
};
// Arrival of FETCH_OK/FETCH_ERROR.
void OnFetchResult(Location largest_location, absl::Status status,
TaskDestroyedCallback callback);
UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
// Manage the relationship with the data stream.
void OnStreamOpened(CanReadCallback callback);
private:
quiche::QuicheWeakPtr<UpstreamFetchTask> task_;
// Before FetchTask is created, an incoming stream will register the callback
// here instead.
CanReadCallback can_read_callback_;
// Initial values from Fetch() call.
FetchResponseCallback ok_callback_; // Will be destroyed on FETCH_OK.
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_