blob: 84c2a85e58e5f0054b2d6de6e5d4312061e025e5 [file] [log] [blame]
// Copyright 2024 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include <cstddef>
#include <cstdint>
#include <optional>
#include <utility>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_mem_slice.h"
namespace moqt {
void MoqtOutgoingQueue::AddObject(quiche::QuicheMemSlice payload, bool key) {
if (queue_.empty() && !key) {
QUICHE_BUG(MoqtOutgoingQueue_AddObject_first_object_not_key)
<< "The first object ever added to the queue must have the \"key\" "
"flag.";
return;
}
if (key) {
if (!queue_.empty()) {
CloseStreamForGroup(current_group_id_);
}
if (queue_.size() == kMaxQueuedGroups) {
queue_.erase(queue_.begin());
}
queue_.emplace_back();
++current_group_id_;
}
absl::string_view payload_view = payload.AsStringView();
uint64_t object_id = queue_.back().size();
queue_.back().push_back(std::move(payload));
PublishObject(current_group_id_, object_id, payload_view,
/*close_stream=*/false);
}
absl::StatusOr<MoqtOutgoingQueue::PublishPastObjectsCallback>
MoqtOutgoingQueue::OnSubscribeForPast(const SubscribeWindow& window) {
QUICHE_BUG_IF(
MoqtOutgoingQueue_requires_kGroup,
window.forwarding_preference() != MoqtForwardingPreference::kGroup)
<< "MoqtOutgoingQueue currently only supports kGroup.";
if (window.HasEnd()) {
// TODO: support this (this would require changing the logic for closing the
// stream below).
return absl::UnimplementedError("SUBSCRIBEs with an end are not supported");
}
return [this, &window]() {
for (size_t i = 0; i < queue_.size(); ++i) {
const uint64_t group_id = first_group_in_queue() + i;
const Group& group = queue_[i];
const bool is_last_group = (i == queue_.size() - 1);
for (size_t j = 0; j < group.size(); ++j) {
const FullSequence sequence{group_id, j};
if (!window.InWindow(sequence)) {
continue;
}
const bool is_last_object = (j == group.size() - 1);
const bool should_close_stream = !is_last_group && is_last_object;
PublishObject(group_id, j, group[j].AsStringView(),
should_close_stream);
}
}
};
}
void MoqtOutgoingQueue::CloseStreamForGroup(uint64_t group_id) {
session_->CloseObjectStream(track_, group_id);
}
void MoqtOutgoingQueue::PublishObject(uint64_t group_id, uint64_t object_id,
absl::string_view payload,
bool close_stream) {
session_->PublishObject(track_, group_id, object_id,
/*object_send_order=*/group_id, payload,
close_stream);
}
} // namespace moqt