blob: 35d1f9da7c1492758c64a2e0d63d57938ff71cbb [file] [log] [blame] [edit]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "absl/status/statusor.h"
#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
namespace moqt {
void MoqtOutgoingQueue::AddObject(quiche::QuicheMemSlice payload, bool key) {
if (queue_.empty() && !key) {
QUICHE_BUG(MoqtOutgoingQueue_AddObject_first_object_not_key)
<< "The first object ever added to the queue must have the \"key\" "
"flag.";
return;
}
if (key) {
if (!queue_.empty()) {
AddRawObject(MoqtObjectStatus::kEndOfGroup, quiche::QuicheMemSlice());
}
if (queue_.size() == kMaxQueuedGroups) {
queue_.erase(queue_.begin());
}
queue_.emplace_back();
++current_group_id_;
}
AddRawObject(MoqtObjectStatus::kNormal, std::move(payload));
}
void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
quiche::QuicheMemSlice payload) {
FullSequence sequence{current_group_id_, queue_.back().size()};
queue_.back().push_back(CachedObject{
sequence, status,
std::make_shared<quiche::QuicheMemSlice>(std::move(payload))});
for (MoqtObjectListener* listener : listeners_) {
listener->OnNewObjectAvailable(sequence);
}
}
std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
FullSequence sequence) const {
if (sequence.group < first_group_in_queue()) {
return PublishedObject{FullSequence{sequence.group, sequence.object},
MoqtObjectStatus::kGroupDoesNotExist,
quiche::QuicheMemSlice()};
}
if (sequence.group > current_group_id_) {
return std::nullopt;
}
const std::vector<CachedObject>& group =
queue_[sequence.group - first_group_in_queue()];
if (sequence.object >= group.size()) {
if (sequence.group == current_group_id_) {
return std::nullopt;
}
return PublishedObject{FullSequence{sequence.group, sequence.object},
MoqtObjectStatus::kObjectDoesNotExist,
quiche::QuicheMemSlice()};
}
QUICHE_DCHECK(sequence == group[sequence.object].sequence);
return CachedObjectToPublishedObject(group[sequence.object]);
}
std::vector<FullSequence> MoqtOutgoingQueue::GetCachedObjectsInRange(
FullSequence start, FullSequence end) const {
std::vector<FullSequence> sequences;
SubscribeWindow window(start, end);
for (const Group& group : queue_) {
for (const CachedObject& object : group) {
if (window.InWindow(object.sequence)) {
sequences.push_back(object.sequence);
}
}
}
return sequences;
}
absl::StatusOr<MoqtTrackStatusCode> MoqtOutgoingQueue::GetTrackStatus() const {
if (queue_.empty()) {
return MoqtTrackStatusCode::kNotYetBegun;
}
return MoqtTrackStatusCode::kInProgress;
}
FullSequence MoqtOutgoingQueue::GetLargestSequence() const {
if (queue_.empty()) {
QUICHE_BUG(MoqtOutgoingQueue_GetLargestSequence_not_begun)
<< "Calling GetLargestSequence() on a track that hasn't begun";
return FullSequence{0, 0};
}
return FullSequence{current_group_id_, queue_.back().size() - 1};
}
} // namespace moqt