blob: c9ece3fe0f37bfb5ff908049991bcc8090029fab [file] [log] [blame]
// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
#define QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_
#include <cstdint>
#include <optional>
#include <vector>
#include "absl/container/flat_hash_map.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/common/quiche_callbacks.h"
namespace moqt {
// A track to which the peer might subscribe.
class LocalTrack {
public:
class Visitor {
public:
virtual ~Visitor() = default;
using PublishPastObjectsCallback = quiche::SingleUseCallback<void()>;
// Requests that application re-publish objects from {start_group,
// start_object} to the latest object. If the return value is nullopt, the
// subscribe is valid and the application will deliver the object and
// the session will send SUBSCRIBE_OK. If the return has a value, the value
// is the error message (the session will send SUBSCRIBE_ERROR). Via this
// API, the application decides if a partially fulfillable
// SUBSCRIBE results in an error or not.
virtual absl::StatusOr<PublishPastObjectsCallback> OnSubscribeForPast(
const SubscribeWindow& window) = 0;
};
// |visitor| must not be nullptr.
LocalTrack(const FullTrackName& full_track_name,
MoqtForwardingPreference forwarding_preference, Visitor* visitor)
: full_track_name_(full_track_name),
forwarding_preference_(forwarding_preference),
windows_(forwarding_preference),
visitor_(visitor) {}
// Creates a LocalTrack that does not start at sequence (0,0)
LocalTrack(const FullTrackName& full_track_name,
MoqtForwardingPreference forwarding_preference, Visitor* visitor,
FullSequence next_sequence)
: full_track_name_(full_track_name),
forwarding_preference_(forwarding_preference),
windows_(forwarding_preference),
next_sequence_(next_sequence),
visitor_(visitor) {}
const FullTrackName& full_track_name() const { return full_track_name_; }
std::optional<uint64_t> track_alias() const { return track_alias_; }
void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
Visitor* visitor() { return visitor_; }
// Returns the subscribe windows that want the object defined by (|group|,
// |object|).
std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) {
return windows_.SequenceIsSubscribed(sequence);
}
void AddWindow(uint64_t subscribe_id, uint64_t start_group,
uint64_t start_object);
void AddWindow(uint64_t subscribe_id, uint64_t start_group,
uint64_t start_object, uint64_t end_group);
void AddWindow(uint64_t subscribe_id, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
uint64_t end_object);
void DeleteWindow(uint64_t subscribe_id) {
windows_.RemoveWindow(subscribe_id);
}
// Returns the largest observed sequence, but increments the object sequence
// by one.
const FullSequence& next_sequence() const { return next_sequence_; }
// Updates next_sequence_ if |sequence| is larger. Updates max_object_ids_
// if relevant.
void SentSequence(FullSequence sequence, MoqtObjectStatus status);
bool HasSubscriber() const { return !windows_.IsEmpty(); }
SubscribeWindow* GetWindow(uint64_t subscribe_id) {
return windows_.GetWindow(subscribe_id);
}
MoqtForwardingPreference forwarding_preference() const {
return forwarding_preference_;
}
void set_announce_cancel() { announce_canceled_ = true; }
bool canceled() const { return announce_canceled_; }
private:
// This only needs to track subscriptions to current and future objects;
// requests for objects in the past are forwarded to the application.
const FullTrackName full_track_name_;
// The forwarding preference for the track.
MoqtForwardingPreference forwarding_preference_;
// Let the first SUBSCRIBE determine the track alias.
std::optional<uint64_t> track_alias_;
// The sequence numbers from this track to which the peer is subscribed.
MoqtSubscribeWindows windows_;
// By recording the highest observed sequence number, MoQT can interpret
// relative sequence numbers in SUBSCRIBEs.
FullSequence next_sequence_ = {0, 0};
// The object ID of each EndOfGroup object received, indexed by group ID.
// Entry does not exist, if no kGroupDoesNotExist, EndOfGroup, or
// EndOfTrack has been received for that group.
absl::flat_hash_map<uint64_t, uint64_t> max_object_ids_;
Visitor* visitor_;
// If true, the session has received ANNOUNCE_CANCELED for this namespace.
// Additional subscribes will be a protocol error, and the track can be
// destroyed once all active subscribes end.
bool announce_canceled_ = false;
};
// A track on the peer to which the session has subscribed.
class RemoteTrack {
public:
class Visitor {
public:
virtual ~Visitor() = default;
// Called when the session receives a response to the SUBSCRIBE, unless it's
// a SUBSCRIBE_ERROR with a new track_alias. In that case, the session will
// automatically retry.
virtual void OnReply(
const FullTrackName& full_track_name,
std::optional<absl::string_view> error_reason_phrase) = 0;
virtual void OnObjectFragment(
const FullTrackName& full_track_name, uint64_t group_sequence,
uint64_t object_sequence, uint64_t object_send_order,
MoqtObjectStatus object_status,
MoqtForwardingPreference forwarding_preference,
absl::string_view object, bool end_of_message) = 0;
// TODO(martinduke): Add final sequence numbers
};
RemoteTrack(const FullTrackName& full_track_name, uint64_t track_alias,
Visitor* visitor)
: full_track_name_(full_track_name),
track_alias_(track_alias),
visitor_(visitor) {}
const FullTrackName& full_track_name() { return full_track_name_; }
uint64_t track_alias() const { return track_alias_; }
Visitor* visitor() { return visitor_; }
// When called while processing the first object in the track, sets the
// forwarding preference to the value indicated by the incoming encoding.
// Otherwise, returns true if the incoming object does not violate the rule
// that the preference is consistent.
bool CheckForwardingPreference(MoqtForwardingPreference preference);
private:
// TODO: There is no accounting for the number of outstanding subscribes,
// because we can't match track names to individual subscribes.
const FullTrackName full_track_name_;
const uint64_t track_alias_;
Visitor* visitor_;
std::optional<MoqtForwardingPreference> forwarding_preference_;
};
} // namespace moqt
#endif // QUICHE_QUIC_MOQT_MOQT_SUBSCRIPTION_H_