blob: 9387b64b83557d9fe27f5ecf5a59107774557c16 [file] [log] [blame]
// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quiche/web_transport/web_transport_priority_scheduler.h"
#include <optional>
#include <utility>
#include "absl/cleanup/cleanup.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "quiche/common/quiche_status_utils.h"
#include "quiche/web_transport/web_transport.h"
namespace webtransport {
absl::Status PriorityScheduler::Register(StreamId stream_id,
const StreamPriority& priority) {
auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr});
if (!success) {
return absl::AlreadyExistsError("Provided stream ID already registered");
}
// Avoid having any nullptr entries in the stream map if we error out further
// down below. This should not happen (all errors below are logical errors),
// but if that does happen, we will avoid crashing due to nullptr dereference.
auto cleanup_nullptr_map_entry =
absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); });
auto [scheduler_it, scheduler_created] =
per_group_schedulers_.try_emplace(priority.send_group_id);
if (scheduler_created) {
// First element in the associated group; register the group in question.
QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {}));
}
PerGroupScheduler& scheduler = scheduler_it->second;
QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order));
it->second = &*scheduler_it;
std::move(cleanup_nullptr_map_entry).Cancel();
return absl::OkStatus();
}
absl::Status PriorityScheduler::Unregister(StreamId stream_id) {
auto it = stream_to_group_map_.find(stream_id);
if (it == stream_to_group_map_.end()) {
return absl::NotFoundError("Stream ID not registered");
}
SendGroupId group_id = it->second->first;
PerGroupScheduler* group_scheduler = &it->second->second;
stream_to_group_map_.erase(it);
QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id));
// Clean up the group if there are no more streams associated with it.
if (!group_scheduler->HasRegistered()) {
per_group_schedulers_.erase(group_id);
QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id));
}
return absl::OkStatus();
}
absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id,
SendOrder new_send_order) {
PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
if (scheduler == nullptr) {
return absl::NotFoundError("Stream ID not registered");
}
return scheduler->UpdatePriority(stream_id, new_send_order);
}
absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id,
SendGroupId new_send_group) {
PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
if (scheduler == nullptr) {
return absl::NotFoundError("Stream ID not registered");
}
bool is_scheduled = scheduler->IsScheduled(stream_id);
std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id);
if (!send_order.has_value()) {
return absl::InternalError(
"Stream registered at the top level scheduler, but not at the "
"per-group one");
}
QUICHE_RETURN_IF_ERROR(Unregister(stream_id));
QUICHE_RETURN_IF_ERROR(
Register(stream_id, StreamPriority{new_send_group, *send_order}));
if (is_scheduled) {
QUICHE_RETURN_IF_ERROR(Schedule(stream_id));
}
return absl::OkStatus();
}
std::optional<StreamPriority> PriorityScheduler::GetPriorityFor(
StreamId stream_id) const {
auto it = stream_to_group_map_.find(stream_id);
if (it == stream_to_group_map_.end()) {
return std::nullopt;
}
const auto& [group_id, group_scheduler] = *it->second;
std::optional<SendOrder> send_order =
group_scheduler.GetPriorityFor(stream_id);
if (!send_order.has_value()) {
return std::nullopt;
}
return StreamPriority{group_id, *send_order};
}
absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const {
auto it = stream_to_group_map_.find(stream_id);
if (it == stream_to_group_map_.end()) {
return absl::NotFoundError("Stream ID not registered");
}
const auto& [group_id, group_scheduler] = *it->second;
absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id);
QUICHE_RETURN_IF_ERROR(per_group_result.status());
if (*per_group_result) {
return true;
}
return group_scheduler.ShouldYield(stream_id);
}
absl::StatusOr<StreamId> PriorityScheduler::PopFront() {
absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront();
QUICHE_RETURN_IF_ERROR(group_id.status());
auto it = per_group_schedulers_.find(*group_id);
if (it == per_group_schedulers_.end()) {
return absl::InternalError(
"Scheduled a group with no per-group scheduler attached");
}
PerGroupScheduler& scheduler = it->second;
absl::StatusOr<StreamId> result = scheduler.PopFront();
if (!result.ok()) {
return absl::InternalError("Inactive group found in top-level schedule");
}
// Reschedule the group if it has more active streams in it.
if (scheduler.HasScheduled()) {
QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id));
}
return result;
}
absl::Status PriorityScheduler::Schedule(StreamId stream_id) {
auto it = stream_to_group_map_.find(stream_id);
if (it == stream_to_group_map_.end()) {
return absl::NotFoundError("Stream ID not registered");
}
auto& [group_id, group_scheduler] = *it->second;
QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id));
return group_scheduler.Schedule(stream_id);
}
bool PriorityScheduler::IsScheduled(StreamId stream_id) const {
const PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
if (scheduler == nullptr) {
return false;
}
return scheduler->IsScheduled(stream_id);
}
} // namespace webtransport