|  | // Copyright 2023 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_ | 
|  | #define QUICHE_COMMON_BTREE_SCHEDULER_H_ | 
|  |  | 
|  | #include <cstddef> | 
|  | #include <limits> | 
|  | #include <optional> | 
|  | #include <utility> | 
|  |  | 
|  | #include "absl/base/attributes.h" | 
|  | #include "absl/container/btree_map.h" | 
|  | #include "absl/container/node_hash_map.h" | 
|  | #include "absl/status/status.h" | 
|  | #include "absl/status/statusor.h" | 
|  | #include "quiche/common/platform/api/quiche_bug_tracker.h" | 
|  | #include "quiche/common/platform/api/quiche_export.h" | 
|  | #include "quiche/common/platform/api/quiche_logging.h" | 
|  |  | 
|  | namespace quiche { | 
|  |  | 
|  | // BTreeScheduler is a data structure that allows streams (and potentially other | 
|  | // entities) to be scheduled according to the arbitrary priorities.  The API for | 
|  | // using the scheduler can be used as follows: | 
|  | //  - A stream has to be registered with a priority before being scheduled. | 
|  | //  - A stream can be unregistered, or can be re-prioritized. | 
|  | //  - A stream can be scheduled; that adds it into the queue. | 
|  | //  - PopFront() will return the stream with highest priority. | 
|  | //  - ShouldYield() will return if there is a stream with higher priority than | 
|  | //    the specified one. | 
|  | // | 
|  | // The prioritization works as following: | 
|  | //  - If two streams have different priorities, the higher priority stream goes | 
|  | //    first. | 
|  | //  - If two streams have the same priority, the one that got scheduled earlier | 
|  | //    goes first. Internally, this is implemented by assigning a monotonically | 
|  | //    decreasing sequence number to every newly scheduled stream. | 
|  | // | 
|  | // The Id type has to define operator==, be hashable via absl::Hash, and | 
|  | // printable via operator<<; the Priority type has to define operator<. | 
|  | template <typename Id, typename Priority> | 
|  | class QUICHE_NO_EXPORT BTreeScheduler { | 
|  | public: | 
|  | // Returns true if there are any streams registered. | 
|  | bool HasRegistered() const { return !streams_.empty(); } | 
|  | // Returns true if there are any streams scheduled. | 
|  | bool HasScheduled() const { return !schedule_.empty(); } | 
|  | // Returns the number of currently scheduled streams. | 
|  | size_t NumScheduled() const { return schedule_.size(); } | 
|  | // Returns the total number of currently registered streams. | 
|  | size_t NumRegistered() const { return streams_.size(); } | 
|  |  | 
|  | // Counts the number of scheduled entries in the range [min, max].  If either | 
|  | // min or max is omitted, negative or positive infinity is assumed. | 
|  | size_t NumScheduledInPriorityRange(std::optional<Priority> min, | 
|  | std::optional<Priority> max) const; | 
|  |  | 
|  | // Returns true if there is a stream that would go before `id` in the | 
|  | // schedule. | 
|  | absl::StatusOr<bool> ShouldYield(Id id) const; | 
|  |  | 
|  | // Returns the priority for `id`, or nullopt if stream is not registered. | 
|  | std::optional<Priority> GetPriorityFor(Id id) const { | 
|  | auto it = streams_.find(id); | 
|  | if (it == streams_.end()) { | 
|  | return std::nullopt; | 
|  | } | 
|  | return it->second.priority; | 
|  | } | 
|  |  | 
|  | // Pops the highest priority stream.  Will fail if the schedule is empty. | 
|  | absl::StatusOr<Id> PopFront(); | 
|  |  | 
|  | // Registers the specified stream with the supplied priority.  The stream must | 
|  | // not be already registered. | 
|  | absl::Status Register(Id stream_id, const Priority& priority); | 
|  | // Unregisters a previously registered stream. | 
|  | absl::Status Unregister(Id stream_id); | 
|  | // Alters the priority of an already registered stream. | 
|  | absl::Status UpdatePriority(Id stream_id, const Priority& new_priority); | 
|  |  | 
|  | // Adds the `stream` into the schedule if it's not already there. | 
|  | absl::Status Schedule(Id stream_id); | 
|  | // Deschedules a stream that is known to be currently scheduled. | 
|  | absl::Status Deschedule(Id stream_id); | 
|  | // Returns true if `stream` is in the schedule. | 
|  | bool IsScheduled(Id stream_id) const; | 
|  |  | 
|  | private: | 
|  | // A record for a registered stream. | 
|  | struct StreamEntry { | 
|  | // The current priority of the stream. | 
|  | ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority; | 
|  | // If present, the sequence number with which the stream is currently | 
|  | // scheduled.  If absent, indicates that the stream is not scheduled. | 
|  | std::optional<int> current_sequence_number = std::nullopt; | 
|  |  | 
|  | bool scheduled() const { return current_sequence_number.has_value(); } | 
|  | }; | 
|  | // The full entry for the stream (includes the ID that's used as a hashmap | 
|  | // key). | 
|  | using FullStreamEntry = std::pair<const Id, StreamEntry>; | 
|  |  | 
|  | // A key that is used to order entities within the schedule. | 
|  | struct ScheduleKey { | 
|  | // The main order key: the priority of the stream. | 
|  | ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority; | 
|  | // The secondary order key: the sequence number. | 
|  | int sequence_number; | 
|  |  | 
|  | // Orders schedule keys in order of decreasing priority. | 
|  | bool operator<(const ScheduleKey& other) const { | 
|  | return std::make_tuple(priority, sequence_number) > | 
|  | std::make_tuple(other.priority, other.sequence_number); | 
|  | } | 
|  |  | 
|  | // In order to find all entities with priority `p`, one can iterate between | 
|  | // `lower_bound(MinForPriority(p))` and `upper_bound(MaxForPriority(p))`. | 
|  | static ScheduleKey MinForPriority(Priority priority) { | 
|  | return ScheduleKey{priority, std::numeric_limits<int>::max()}; | 
|  | } | 
|  | static ScheduleKey MaxForPriority(Priority priority) { | 
|  | return ScheduleKey{priority, std::numeric_limits<int>::min()}; | 
|  | } | 
|  | }; | 
|  | using FullScheduleEntry = std::pair<const ScheduleKey, FullStreamEntry*>; | 
|  | using ScheduleIterator = | 
|  | typename absl::btree_map<ScheduleKey, FullStreamEntry*>::const_iterator; | 
|  |  | 
|  | // Convenience method to get the stream ID for a schedule entry. | 
|  | static Id StreamId(const FullScheduleEntry& entry) { | 
|  | return entry.second->first; | 
|  | } | 
|  |  | 
|  | // Removes a stream from the schedule, and returns the old entry if it were | 
|  | // present. | 
|  | absl::StatusOr<FullScheduleEntry> DescheduleStream(const StreamEntry& entry); | 
|  |  | 
|  | // The map of currently registered streams. | 
|  | absl::node_hash_map<Id, StreamEntry> streams_; | 
|  | // The stream schedule, ordered starting from the highest priority stream. | 
|  | absl::btree_map<ScheduleKey, FullStreamEntry*> schedule_; | 
|  |  | 
|  | // The counter that is used to ensure that streams with the same priority are | 
|  | // handled in the FIFO order.  Decreases with every write. | 
|  | int current_write_sequence_number_ = 0; | 
|  | }; | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | size_t BTreeScheduler<Id, Priority>::NumScheduledInPriorityRange( | 
|  | std::optional<Priority> min, std::optional<Priority> max) const { | 
|  | if (min.has_value() && max.has_value()) { | 
|  | QUICHE_DCHECK(*min <= *max); | 
|  | } | 
|  | // This is reversed, since the schedule is ordered in the descending priority | 
|  | // order. | 
|  | ScheduleIterator begin = | 
|  | max.has_value() ? schedule_.lower_bound(ScheduleKey::MinForPriority(*max)) | 
|  | : schedule_.begin(); | 
|  | ScheduleIterator end = | 
|  | min.has_value() ? schedule_.upper_bound(ScheduleKey::MaxForPriority(*min)) | 
|  | : schedule_.end(); | 
|  | return end - begin; | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::Status BTreeScheduler<Id, Priority>::Register(Id stream_id, | 
|  | const Priority& priority) { | 
|  | auto [it, success] = streams_.insert({stream_id, StreamEntry{priority}}); | 
|  | if (!success) { | 
|  | return absl::AlreadyExistsError("ID already registered"); | 
|  | } | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | auto BTreeScheduler<Id, Priority>::DescheduleStream(const StreamEntry& entry) | 
|  | -> absl::StatusOr<FullScheduleEntry> { | 
|  | QUICHE_DCHECK(entry.scheduled()); | 
|  | auto it = schedule_.find( | 
|  | ScheduleKey{entry.priority, *entry.current_sequence_number}); | 
|  | if (it == schedule_.end()) { | 
|  | return absl::InternalError( | 
|  | "Calling DescheduleStream() on an entry that is not in the schedule at " | 
|  | "the expected key."); | 
|  | } | 
|  | FullScheduleEntry result = *it; | 
|  | schedule_.erase(it); | 
|  | return result; | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::Status BTreeScheduler<Id, Priority>::Unregister(Id stream_id) { | 
|  | auto it = streams_.find(stream_id); | 
|  | if (it == streams_.end()) { | 
|  | return absl::NotFoundError("Stream not registered"); | 
|  | } | 
|  | const StreamEntry& stream = it->second; | 
|  |  | 
|  | if (stream.scheduled()) { | 
|  | if (!DescheduleStream(stream).ok()) { | 
|  | QUICHE_BUG(BTreeSchedule_Unregister_NotInSchedule) | 
|  | << "UnregisterStream() called on a stream ID " << stream_id | 
|  | << ", which is marked ready, but is not in the schedule"; | 
|  | } | 
|  | } | 
|  |  | 
|  | streams_.erase(it); | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::Status BTreeScheduler<Id, Priority>::UpdatePriority( | 
|  | Id stream_id, const Priority& new_priority) { | 
|  | auto it = streams_.find(stream_id); | 
|  | if (it == streams_.end()) { | 
|  | return absl::NotFoundError("ID not registered"); | 
|  | } | 
|  |  | 
|  | StreamEntry& stream = it->second; | 
|  | std::optional<int> sequence_number; | 
|  | if (stream.scheduled()) { | 
|  | absl::StatusOr<FullScheduleEntry> old_entry = DescheduleStream(stream); | 
|  | if (old_entry.ok()) { | 
|  | sequence_number = old_entry->first.sequence_number; | 
|  | QUICHE_DCHECK_EQ(old_entry->second, &*it); | 
|  | } else { | 
|  | QUICHE_BUG(BTreeScheduler_Update_Not_In_Schedule) | 
|  | << "UpdatePriority() called on a stream ID " << stream_id | 
|  | << ", which is marked ready, but is not in the schedule"; | 
|  | } | 
|  | } | 
|  |  | 
|  | stream.priority = new_priority; | 
|  | if (sequence_number.has_value()) { | 
|  | schedule_.insert({ScheduleKey{stream.priority, *sequence_number}, &*it}); | 
|  | } | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::StatusOr<bool> BTreeScheduler<Id, Priority>::ShouldYield( | 
|  | Id stream_id) const { | 
|  | const auto stream_it = streams_.find(stream_id); | 
|  | if (stream_it == streams_.end()) { | 
|  | return absl::NotFoundError("ID not registered"); | 
|  | } | 
|  | const StreamEntry& stream = stream_it->second; | 
|  |  | 
|  | if (schedule_.empty()) { | 
|  | return false; | 
|  | } | 
|  | const FullScheduleEntry& next = *schedule_.begin(); | 
|  | if (StreamId(next) == stream_id) { | 
|  | return false; | 
|  | } | 
|  | return next.first.priority >= stream.priority; | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::StatusOr<Id> BTreeScheduler<Id, Priority>::PopFront() { | 
|  | if (schedule_.empty()) { | 
|  | return absl::NotFoundError("No streams scheduled"); | 
|  | } | 
|  | auto schedule_it = schedule_.begin(); | 
|  | QUICHE_DCHECK(schedule_it->second->second.scheduled()); | 
|  | schedule_it->second->second.current_sequence_number = std::nullopt; | 
|  |  | 
|  | Id result = StreamId(*schedule_it); | 
|  | schedule_.erase(schedule_it); | 
|  | return result; | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::Status BTreeScheduler<Id, Priority>::Schedule(Id stream_id) { | 
|  | const auto stream_it = streams_.find(stream_id); | 
|  | if (stream_it == streams_.end()) { | 
|  | return absl::NotFoundError("ID not registered"); | 
|  | } | 
|  | if (stream_it->second.scheduled()) { | 
|  | return absl::OkStatus(); | 
|  | } | 
|  | auto [schedule_it, success] = | 
|  | schedule_.insert({ScheduleKey{stream_it->second.priority, | 
|  | --current_write_sequence_number_}, | 
|  | &*stream_it}); | 
|  | QUICHE_BUG_IF(WebTransportWriteBlockedList_AddStream_conflict, !success) | 
|  | << "Conflicting key in scheduler for stream " << stream_id; | 
|  | stream_it->second.current_sequence_number = | 
|  | schedule_it->first.sequence_number; | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | bool BTreeScheduler<Id, Priority>::IsScheduled(Id stream_id) const { | 
|  | const auto stream_it = streams_.find(stream_id); | 
|  | if (stream_it == streams_.end()) { | 
|  | return false; | 
|  | } | 
|  | return stream_it->second.scheduled(); | 
|  | } | 
|  |  | 
|  | template <typename Id, typename Priority> | 
|  | absl::Status BTreeScheduler<Id, Priority>::Deschedule(Id stream_id) { | 
|  | auto it = streams_.find(stream_id); | 
|  | if (it == streams_.end()) { | 
|  | return absl::NotFoundError("Stream not registered"); | 
|  | } | 
|  | StreamEntry& stream = it->second; | 
|  |  | 
|  | if (!stream.scheduled()) { | 
|  | return absl::FailedPreconditionError("Stream not scheduled"); | 
|  | } | 
|  | if (!DescheduleStream(stream).ok()) { | 
|  | QUICHE_BUG(BTreeSchedule_Unregister_NotInSchedule) | 
|  | << "UnregisterStream() called on a stream ID " << stream_id | 
|  | << ", which is marked ready, but is not in the schedule"; | 
|  | } | 
|  | stream.current_sequence_number.reset(); | 
|  |  | 
|  | return absl::OkStatus(); | 
|  | } | 
|  |  | 
|  | }  // namespace quiche | 
|  |  | 
|  | #endif  // QUICHE_COMMON_BTREE_SCHEDULER_H_ |