blob: 5fdcf65ad5f21cf20fa8e8df700577c0f4129884 [file] [log] [blame] [edit]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <tuple>
#include <vector>
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/simple_buffer_allocator.h"
namespace moqt {
bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id,
MoqtObjectStatus status,
absl::string_view object) {
if (queue_.size() == kMaxQueuedGroups) {
if (queue_.begin()->first > group_id) {
QUICHE_DLOG(INFO) << "Skipping object from group " << group_id
<< " because it is too old.";
return true;
}
if (queue_.find(group_id) == queue_.end()) {
// Erase the oldest group.
queue_.erase(queue_.begin());
}
}
QUICHE_CHECK(status == MoqtObjectStatus::kNormal || object.empty());
return AddRawObject(FullSequence{group_id, object_id}, status, object);
}
std::tuple<uint64_t, bool> MoqtLiveRelayQueue::NextObject(Group& group) const {
auto it = group.rbegin();
if (it == group.rend()) {
return std::tuple<uint64_t, bool>(0, false);
}
return std::tuple<uint64_t, bool>(
it->second.sequence.object + 1,
(it->second.status == MoqtObjectStatus::kEndOfGroup ||
it->second.status == MoqtObjectStatus::kGroupDoesNotExist ||
it->second.status == MoqtObjectStatus::kEndOfTrack));
}
bool MoqtLiveRelayQueue::AddRawObject(FullSequence sequence,
MoqtObjectStatus status,
absl::string_view payload) {
// Validate the input given previously received markers.
if (end_of_track_.has_value() && sequence > *end_of_track_) {
QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
<< "track";
return false;
}
if (status == MoqtObjectStatus::kEndOfTrack) {
if (sequence < next_sequence_) {
QUICHE_DLOG(INFO) << "EndOfTrack is too early.";
return false;
}
// TODO(martinduke): Check that EndOfTrack has normal IDs.
end_of_track_ = sequence;
}
if (status == MoqtObjectStatus::kGroupDoesNotExist && sequence.object > 0) {
QUICHE_DLOG(INFO) << "GroupDoesNotExist is not the last object in the "
<< "group";
return false;
}
auto group_it = queue_.try_emplace(sequence.group);
if (!group_it.second) { // Group already exists.
auto [next_object_id, is_the_end] = NextObject(group_it.first->second);
if (next_object_id <= sequence.object && is_the_end) {
QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
<< "group";
return false;
}
if (status == MoqtObjectStatus::kEndOfGroup &&
sequence.object < next_object_id) {
QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last "
<< "object in the group.";
return false;
}
}
if (next_sequence_ <= sequence) {
next_sequence_ = FullSequence{sequence.group, sequence.object + 1};
}
std::shared_ptr<quiche::QuicheMemSlice> slice =
payload.empty()
? nullptr
: std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy(
quiche::SimpleBufferAllocator::Get(), payload));
auto object_it = group_it.first->second.try_emplace(sequence.object, sequence,
status, slice);
if (!object_it.second) {
QUICHE_DLOG(ERROR) << "Sender is overwriting an existing object.";
return false;
}
for (MoqtObjectListener* listener : listeners_) {
listener->OnNewObjectAvailable(sequence);
}
return true;
}
std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject(
FullSequence sequence) const {
auto group_it = queue_.find(sequence.group);
if (group_it == queue_.end()) {
return std::nullopt;
}
auto object_it = group_it->second.find(sequence.object);
if (object_it == group_it->second.end()) {
return std::nullopt;
}
return CachedObjectToPublishedObject(object_it->second);
}
std::vector<FullSequence> MoqtLiveRelayQueue::GetCachedObjectsInRange(
FullSequence start, FullSequence end) const {
std::vector<FullSequence> sequences;
SubscribeWindow window(start, end);
for (auto& group_it : queue_) {
for (auto& object_it : group_it.second) {
if (window.InWindow(object_it.second.sequence)) {
sequences.push_back(object_it.second.sequence);
}
}
}
return sequences;
}
absl::StatusOr<MoqtTrackStatusCode> MoqtLiveRelayQueue::GetTrackStatus() const {
if (end_of_track_.has_value()) {
return MoqtTrackStatusCode::kFinished;
}
if (queue_.empty()) {
// TODO(martinduke): Retrieve the track status from upstream.
return MoqtTrackStatusCode::kNotYetBegun;
}
return MoqtTrackStatusCode::kInProgress;
}
FullSequence MoqtLiveRelayQueue::GetLargestSequence() const {
return FullSequence{next_sequence_.group, next_sequence_.object - 1};
}
} // namespace moqt