blob: 0aa2c0cf129198a29e04f7916f1a11f3f65323ed [file] [log] [blame] [edit]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_RELAY_TRACK_PUBLISHER_H_
#define QUICHE_QUIC_MOQT_MOQT_RELAY_TRACK_PUBLISHER_H_
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <variant>
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/status/status.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/core/quic_clock.h"
#include "quiche/quic/core/quic_default_clock.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_fetch_task.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_object.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session_interface.h"
#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_weak_ptr.h"
namespace moqt {
using DeleteTrackCallback = quiche::SingleUseCallback<void()>;
// MoqtRelayTrackPublisher lets the user send objects by providing the contents
// of the object and the object metadata. It will store these by location
// number. When called on to provide a range of objects, it will fill in any
// missing objects and groups.
//
// The queue will maintain a buffer of three most recent groups that will be
// provided to subscribers automatically.
//
// This class is primarily meant to be used by live relays to buffer the
// frames that arrive for a short time.
class MoqtRelayTrackPublisher : public MoqtTrackPublisher,
public SubscribeVisitor {
public:
MoqtRelayTrackPublisher(
FullTrackName track, quiche::QuicheWeakPtr<MoqtSessionInterface> upstream,
DeleteTrackCallback delete_track_callback,
std::optional<MoqtForwardingPreference> forwarding_preference,
std::optional<MoqtDeliveryOrder> delivery_order,
std::optional<quic::QuicTime> expiration = quic::QuicTime::Infinite(),
const quic::QuicClock* clock = quic::QuicDefaultClock::Get())
: clock_(clock),
track_(std::move(track)),
upstream_(std::move(upstream)),
delete_track_callback_(std::move(delete_track_callback)),
forwarding_preference_(forwarding_preference),
delivery_order_(delivery_order),
expiration_(expiration),
next_location_(0, 0) {}
MoqtRelayTrackPublisher(const MoqtRelayTrackPublisher&) = delete;
MoqtRelayTrackPublisher(MoqtRelayTrackPublisher&&) = default;
MoqtRelayTrackPublisher& operator=(const MoqtRelayTrackPublisher&) = delete;
MoqtRelayTrackPublisher& operator=(MoqtRelayTrackPublisher&&) = default;
// SubscribeVisitor implementation.
void OnReply(
const FullTrackName& full_track_name,
std::variant<SubscribeOkData, MoqtRequestError> response) override;
// TODO(vasilvv): Implement this if we want to support Object Acks across
// relays.
void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {}
void OnObjectFragment(const FullTrackName& full_track_name,
const PublishedObjectMetadata& metadata,
absl::string_view object, bool end_of_message) override;
void OnPublishDone(FullTrackName full_track_name) override;
void OnMalformedTrack(const FullTrackName& /*full_track_name*/) override {
DeleteTrack();
}
void OnStreamFin(const FullTrackName&, DataStreamIndex stream) override;
void OnStreamReset(const FullTrackName&, DataStreamIndex stream) override;
// MoqtTrackPublisher implementation.
const FullTrackName& GetTrackName() const override { return track_; }
std::optional<PublishedObject> GetCachedObject(
uint64_t group_id, uint64_t subgroup_id,
uint64_t min_object) const override;
void AddObjectListener(MoqtObjectListener* listener) override;
void RemoveObjectListener(MoqtObjectListener* listener) override;
std::optional<Location> largest_location() const override;
std::optional<MoqtForwardingPreference> forwarding_preference()
const override {
return forwarding_preference_;
}
std::optional<MoqtDeliveryOrder> delivery_order() const override {
return delivery_order_;
}
std::optional<quic::QuicTimeDelta> expiration() const override;
std::unique_ptr<MoqtFetchTask> StandaloneFetch(
Location /*start*/, Location /*end*/,
std::optional<MoqtDeliveryOrder> /*order*/) override {
return std::make_unique<MoqtFailedFetch>(
absl::UnimplementedError("Fetch not implemented"));
}
std::unique_ptr<MoqtFetchTask> RelativeFetch(
uint64_t /*group_diff*/,
std::optional<MoqtDeliveryOrder> /*order*/) override {
return std::make_unique<MoqtFailedFetch>(
absl::UnimplementedError("Fetch not implemented"));
}
std::unique_ptr<MoqtFetchTask> AbsoluteFetch(
uint64_t /*group*/, std::optional<MoqtDeliveryOrder> /*order*/) override {
return std::make_unique<MoqtFailedFetch>(
absl::UnimplementedError("Fetch not implemented"));
}
void ForAllObjects(
quiche::UnretainedCallback<void(const CachedObject&)> callback);
void Close() { is_closing_ = true; }
private:
// The number of recent groups to keep around for newly joined subscribers.
static constexpr size_t kMaxQueuedGroups = 3;
void DeleteTrack();
// Ordered by object id.
using Subgroup = absl::btree_map<uint64_t, CachedObject>;
struct Group {
uint64_t next_object = 0;
bool complete = false; // If true, kEndOfGroup has been received.
absl::btree_map<uint64_t, Subgroup> subgroups; // Ordered by subgroup id.
};
bool is_closing_ = false;
bool got_response_ = false;
const quic::QuicClock* clock_;
FullTrackName track_;
quiche::QuicheWeakPtr<MoqtSessionInterface> upstream_;
DeleteTrackCallback delete_track_callback_;
std::optional<MoqtForwardingPreference> forwarding_preference_;
std::optional<MoqtDeliveryOrder> delivery_order_;
// TODO(martinduke): This publisher should destroy itself when the expiration
// time passes.
std::optional<quic::QuicTime> expiration_;
absl::btree_map<uint64_t, Group> queue_; // Ordered by group id.
absl::flat_hash_set<MoqtObjectListener*> listeners_;
std::optional<Location> end_of_track_;
Location next_location_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_RELAY_TRACK_PUBLISHER_H_