blob: 3db2a6ce92b7d502823379d6c1901338f5614d3a [file] [log] [blame]
// Copyright 2023 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_
#define QUICHE_COMMON_BTREE_SCHEDULER_H_
#include <cstddef>
#include <limits>
#include <optional>
#include <utility>
#include "absl/base/attributes.h"
#include "absl/container/btree_map.h"
#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/platform/api/quiche_logging.h"
namespace quiche {
// BTreeScheduler is a data structure that allows streams (and potentially other
// entities) to be scheduled according to the arbitrary priorities. The API for
// using the scheduler can be used as follows:
// - A stream has to be registered with a priority before being scheduled.
// - A stream can be unregistered, or can be re-prioritized.
// - A stream can be scheduled; that adds it into the queue.
// - PopFront() will return the stream with highest priority.
// - ShouldYield() will return if there is a stream with higher priority than
// the specified one.
//
// The prioritization works as following:
// - If two streams have different priorities, the higher priority stream goes
// first.
// - If two streams have the same priority, the one that got scheduled earlier
// goes first. Internally, this is implemented by assigning a monotonically
// decreasing sequence number to every newly scheduled stream.
//
// The Id type has to define operator==, be hashable via absl::Hash, and
// printable via operator<<; the Priority type has to define operator<.
template <typename Id, typename Priority>
class QUICHE_NO_EXPORT BTreeScheduler {
public:
// Returns true if there are any streams registered.
bool HasRegistered() const { return !streams_.empty(); }
// Returns true if there are any streams scheduled.
bool HasScheduled() const { return !schedule_.empty(); }
// Returns the number of currently scheduled streams.
size_t NumScheduled() const { return schedule_.size(); }
// Returns the total number of currently registered streams.
size_t NumRegistered() const { return streams_.size(); }
// Counts the number of scheduled entries in the range [min, max]. If either
// min or max is omitted, negative or positive infinity is assumed.
size_t NumScheduledInPriorityRange(std::optional<Priority> min,
std::optional<Priority> max) const;
// Returns true if there is a stream that would go before `id` in the
// schedule.
absl::StatusOr<bool> ShouldYield(Id id) const;
// Returns the priority for `id`, or nullopt if stream is not registered.
std::optional<Priority> GetPriorityFor(Id id) const {
auto it = streams_.find(id);
if (it == streams_.end()) {
return std::nullopt;
}
return it->second.priority;
}
// Pops the highest priority stream. Will fail if the schedule is empty.
absl::StatusOr<Id> PopFront();
// Registers the specified stream with the supplied priority. The stream must
// not be already registered.
absl::Status Register(Id stream_id, const Priority& priority);
// Unregisters a previously registered stream.
absl::Status Unregister(Id stream_id);
// Alters the priority of an already registered stream.
absl::Status UpdatePriority(Id stream_id, const Priority& new_priority);
// Adds the `stream` into the schedule if it's not already there.
absl::Status Schedule(Id stream_id);
// Returns true if `stream` is in the schedule.
bool IsScheduled(Id stream_id) const;
private:
// A record for a registered stream.
struct StreamEntry {
// The current priority of the stream.
ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
// If present, the sequence number with which the stream is currently
// scheduled. If absent, indicates that the stream is not scheduled.
std::optional<int> current_sequence_number = std::nullopt;
bool scheduled() const { return current_sequence_number.has_value(); }
};
// The full entry for the stream (includes the ID that's used as a hashmap
// key).
using FullStreamEntry = std::pair<const Id, StreamEntry>;
// A key that is used to order entities within the schedule.
struct ScheduleKey {
// The main order key: the priority of the stream.
ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
// The secondary order key: the sequence number.
int sequence_number;
// Orders schedule keys in order of decreasing priority.
bool operator<(const ScheduleKey& other) const {
return std::make_tuple(priority, sequence_number) >
std::make_tuple(other.priority, other.sequence_number);
}
// In order to find all entities with priority `p`, one can iterate between
// `lower_bound(MinForPriority(p))` and `upper_bound(MaxForPriority(p))`.
static ScheduleKey MinForPriority(Priority priority) {
return ScheduleKey{priority, std::numeric_limits<int>::max()};
}
static ScheduleKey MaxForPriority(Priority priority) {
return ScheduleKey{priority, std::numeric_limits<int>::min()};
}
};
using FullScheduleEntry = std::pair<const ScheduleKey, FullStreamEntry*>;
using ScheduleIterator =
typename absl::btree_map<ScheduleKey, FullStreamEntry*>::const_iterator;
// Convenience method to get the stream ID for a schedule entry.
static Id StreamId(const FullScheduleEntry& entry) {
return entry.second->first;
}
// Removes a stream from the schedule, and returns the old entry if it were
// present.
absl::StatusOr<FullScheduleEntry> DescheduleStream(const StreamEntry& entry);
// The map of currently registered streams.
absl::node_hash_map<Id, StreamEntry> streams_;
// The stream schedule, ordered starting from the highest priority stream.
absl::btree_map<ScheduleKey, FullStreamEntry*> schedule_;
// The counter that is used to ensure that streams with the same priority are
// handled in the FIFO order. Decreases with every write.
int current_write_sequence_number_ = 0;
};
template <typename Id, typename Priority>
size_t BTreeScheduler<Id, Priority>::NumScheduledInPriorityRange(
std::optional<Priority> min, std::optional<Priority> max) const {
if (min.has_value() && max.has_value()) {
QUICHE_DCHECK(*min <= *max);
}
// This is reversed, since the schedule is ordered in the descending priority
// order.
ScheduleIterator begin =
max.has_value() ? schedule_.lower_bound(ScheduleKey::MinForPriority(*max))
: schedule_.begin();
ScheduleIterator end =
min.has_value() ? schedule_.upper_bound(ScheduleKey::MaxForPriority(*min))
: schedule_.end();
return end - begin;
}
template <typename Id, typename Priority>
absl::Status BTreeScheduler<Id, Priority>::Register(Id stream_id,
const Priority& priority) {
auto [it, success] = streams_.insert({stream_id, StreamEntry{priority}});
if (!success) {
return absl::AlreadyExistsError("ID already registered");
}
return absl::OkStatus();
}
template <typename Id, typename Priority>
auto BTreeScheduler<Id, Priority>::DescheduleStream(const StreamEntry& entry)
-> absl::StatusOr<FullScheduleEntry> {
QUICHE_DCHECK(entry.scheduled());
auto it = schedule_.find(
ScheduleKey{entry.priority, *entry.current_sequence_number});
if (it == schedule_.end()) {
return absl::InternalError(
"Calling DescheduleStream() on an entry that is not in the schedule at "
"the expected key.");
}
FullScheduleEntry result = *it;
schedule_.erase(it);
return result;
}
template <typename Id, typename Priority>
absl::Status BTreeScheduler<Id, Priority>::Unregister(Id stream_id) {
auto it = streams_.find(stream_id);
if (it == streams_.end()) {
return absl::NotFoundError("Stream not registered");
}
const StreamEntry& stream = it->second;
if (stream.scheduled()) {
if (!DescheduleStream(stream).ok()) {
QUICHE_BUG(BTreeSchedule_Unregister_NotInSchedule)
<< "UnregisterStream() called on a stream ID " << stream_id
<< ", which is marked ready, but is not in the schedule";
}
}
streams_.erase(it);
return absl::OkStatus();
}
template <typename Id, typename Priority>
absl::Status BTreeScheduler<Id, Priority>::UpdatePriority(
Id stream_id, const Priority& new_priority) {
auto it = streams_.find(stream_id);
if (it == streams_.end()) {
return absl::NotFoundError("ID not registered");
}
StreamEntry& stream = it->second;
std::optional<int> sequence_number;
if (stream.scheduled()) {
absl::StatusOr<FullScheduleEntry> old_entry = DescheduleStream(stream);
if (old_entry.ok()) {
sequence_number = old_entry->first.sequence_number;
QUICHE_DCHECK_EQ(old_entry->second, &*it);
} else {
QUICHE_BUG(BTreeScheduler_Update_Not_In_Schedule)
<< "UpdatePriority() called on a stream ID " << stream_id
<< ", which is marked ready, but is not in the schedule";
}
}
stream.priority = new_priority;
if (sequence_number.has_value()) {
schedule_.insert({ScheduleKey{stream.priority, *sequence_number}, &*it});
}
return absl::OkStatus();
}
template <typename Id, typename Priority>
absl::StatusOr<bool> BTreeScheduler<Id, Priority>::ShouldYield(
Id stream_id) const {
const auto stream_it = streams_.find(stream_id);
if (stream_it == streams_.end()) {
return absl::NotFoundError("ID not registered");
}
const StreamEntry& stream = stream_it->second;
if (schedule_.empty()) {
return false;
}
const FullScheduleEntry& next = *schedule_.begin();
if (StreamId(next) == stream_id) {
return false;
}
return next.first.priority >= stream.priority;
}
template <typename Id, typename Priority>
absl::StatusOr<Id> BTreeScheduler<Id, Priority>::PopFront() {
if (schedule_.empty()) {
return absl::NotFoundError("No streams scheduled");
}
auto schedule_it = schedule_.begin();
QUICHE_DCHECK(schedule_it->second->second.scheduled());
schedule_it->second->second.current_sequence_number = std::nullopt;
Id result = StreamId(*schedule_it);
schedule_.erase(schedule_it);
return result;
}
template <typename Id, typename Priority>
absl::Status BTreeScheduler<Id, Priority>::Schedule(Id stream_id) {
const auto stream_it = streams_.find(stream_id);
if (stream_it == streams_.end()) {
return absl::NotFoundError("ID not registered");
}
if (stream_it->second.scheduled()) {
return absl::OkStatus();
}
auto [schedule_it, success] =
schedule_.insert({ScheduleKey{stream_it->second.priority,
--current_write_sequence_number_},
&*stream_it});
QUICHE_BUG_IF(WebTransportWriteBlockedList_AddStream_conflict, !success)
<< "Conflicting key in scheduler for stream " << stream_id;
stream_it->second.current_sequence_number =
schedule_it->first.sequence_number;
return absl::OkStatus();
}
template <typename Id, typename Priority>
bool BTreeScheduler<Id, Priority>::IsScheduled(Id stream_id) const {
const auto stream_it = streams_.find(stream_id);
if (stream_it == streams_.end()) {
return false;
}
return stream_it->second.scheduled();
}
} // namespace quiche
#endif // QUICHE_COMMON_BTREE_SCHEDULER_H_