Implement WebTransport session priority scheduler. PiperOrigin-RevId: 596645681
diff --git a/build/source_list.bzl b/build/source_list.bzl index fd8192c..dcdedb9 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -394,6 +394,7 @@ "web_transport/encapsulated/encapsulated_web_transport.h", "web_transport/web_transport.h", "web_transport/web_transport_headers.h", + "web_transport/web_transport_priority_scheduler.h", ] quiche_core_srcs = [ "common/capsule.cc", @@ -683,6 +684,7 @@ "web_transport/complete_buffer_visitor.cc", "web_transport/encapsulated/encapsulated_web_transport.cc", "web_transport/web_transport_headers.cc", + "web_transport/web_transport_priority_scheduler.cc", ] quiche_tool_support_hdrs = [ "common/platform/api/quiche_command_line_flags.h", @@ -1311,6 +1313,7 @@ "spdy/core/spdy_protocol_test.cc", "web_transport/encapsulated/encapsulated_web_transport_test.cc", "web_transport/web_transport_headers_test.cc", + "web_transport/web_transport_priority_scheduler_test.cc", ] io_tests_hdrs = [ ]
diff --git a/build/source_list.gni b/build/source_list.gni index d3fb7a0..7d44368 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -394,6 +394,7 @@ "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h", "src/quiche/web_transport/web_transport.h", "src/quiche/web_transport/web_transport_headers.h", + "src/quiche/web_transport/web_transport_priority_scheduler.h", ] quiche_core_srcs = [ "src/quiche/common/capsule.cc", @@ -683,6 +684,7 @@ "src/quiche/web_transport/complete_buffer_visitor.cc", "src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc", "src/quiche/web_transport/web_transport_headers.cc", + "src/quiche/web_transport/web_transport_priority_scheduler.cc", ] quiche_tool_support_hdrs = [ "src/quiche/common/platform/api/quiche_command_line_flags.h", @@ -1312,6 +1314,7 @@ "src/quiche/spdy/core/spdy_protocol_test.cc", "src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc", "src/quiche/web_transport/web_transport_headers_test.cc", + "src/quiche/web_transport/web_transport_priority_scheduler_test.cc", ] io_tests_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json index 69eeccd..967ddc8 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -392,7 +392,8 @@ "quiche/web_transport/complete_buffer_visitor.h", "quiche/web_transport/encapsulated/encapsulated_web_transport.h", "quiche/web_transport/web_transport.h", - "quiche/web_transport/web_transport_headers.h" + "quiche/web_transport/web_transport_headers.h", + "quiche/web_transport/web_transport_priority_scheduler.h" ], "quiche_core_srcs": [ "quiche/common/capsule.cc", @@ -681,7 +682,8 @@ "quiche/spdy/core/spdy_protocol.cc", "quiche/web_transport/complete_buffer_visitor.cc", "quiche/web_transport/encapsulated/encapsulated_web_transport.cc", - "quiche/web_transport/web_transport_headers.cc" + "quiche/web_transport/web_transport_headers.cc", + "quiche/web_transport/web_transport_priority_scheduler.cc" ], "quiche_tool_support_hdrs": [ "quiche/common/platform/api/quiche_command_line_flags.h", @@ -1310,7 +1312,8 @@ "quiche/spdy/core/spdy_framer_test.cc", "quiche/spdy/core/spdy_protocol_test.cc", "quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc", - "quiche/web_transport/web_transport_headers_test.cc" + "quiche/web_transport/web_transport_headers_test.cc", + "quiche/web_transport/web_transport_priority_scheduler_test.cc" ], "io_tests_hdrs": [
diff --git a/quiche/common/btree_scheduler.h b/quiche/common/btree_scheduler.h index 0f160c8..d2bb765 100644 --- a/quiche/common/btree_scheduler.h +++ b/quiche/common/btree_scheduler.h
@@ -5,15 +5,19 @@ #ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_ #define QUICHE_COMMON_BTREE_SCHEDULER_H_ +#include <cstddef> #include <limits> #include <optional> +#include <utility> +#include "absl/base/attributes.h" #include "absl/container/btree_map.h" #include "absl/container/node_hash_map.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/platform/api/quiche_logging.h" namespace quiche { @@ -39,6 +43,8 @@ template <typename Id, typename Priority> class QUICHE_EXPORT BTreeScheduler { public: + // Returns true if there are any streams registered. + bool HasRegistered() const { return !streams_.empty(); } // Returns true if there are any streams scheduled. bool HasScheduled() const { return !schedule_.empty(); } // Returns the number of currently scheduled streams. @@ -82,7 +88,7 @@ // A record for a registered stream. struct StreamEntry { // The current priority of the stream. - Priority priority; + ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority; // If present, the sequence number with which the stream is currently // scheduled. If absent, indicates that the stream is not scheduled. std::optional<int> current_sequence_number; @@ -96,7 +102,7 @@ // A key that is used to order entities within the schedule. struct ScheduleKey { // The main order key: the priority of the stream. - Priority priority; + ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority; // The secondary order key: the sequence number. int sequence_number;
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h index dbf22f8..c008801 100644 --- a/quiche/web_transport/web_transport.h +++ b/quiche/web_transport/web_transport.h
@@ -37,6 +37,25 @@ // Application-specific error code used for closing a WebTransport session. using SessionErrorCode = uint32_t; +// WebTransport priority as defined in +// https://w3c.github.io/webtransport/#webtransportsendstream-write +// The rules are as follows: +// - Streams with the same priority are handled in FIFO order. +// - Streams with the same group_id but different send_order are handled +// strictly in order. +// - Different group_ids are handled in the FIFO order. +using SendGroupId = uint32_t; +using SendOrder = int64_t; +struct QUICHE_EXPORT StreamPriority { + SendGroupId send_group_id = 0; + SendOrder send_order = 0; + + bool operator==(const StreamPriority& other) const { + return send_group_id == other.send_group_id && + send_order == other.send_order; + } +}; + // An outcome of a datagram send call. enum class DatagramStatusCode { // Datagram has been successfully sent or placed into the datagram queue.
diff --git a/quiche/web_transport/web_transport_priority_scheduler.cc b/quiche/web_transport/web_transport_priority_scheduler.cc new file mode 100644 index 0000000..9387b64 --- /dev/null +++ b/quiche/web_transport/web_transport_priority_scheduler.cc
@@ -0,0 +1,166 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/web_transport/web_transport_priority_scheduler.h" + +#include <optional> +#include <utility> + +#include "absl/cleanup/cleanup.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "quiche/common/quiche_status_utils.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { + +absl::Status PriorityScheduler::Register(StreamId stream_id, + const StreamPriority& priority) { + auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr}); + if (!success) { + return absl::AlreadyExistsError("Provided stream ID already registered"); + } + // Avoid having any nullptr entries in the stream map if we error out further + // down below. This should not happen (all errors below are logical errors), + // but if that does happen, we will avoid crashing due to nullptr dereference. + auto cleanup_nullptr_map_entry = + absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); }); + + auto [scheduler_it, scheduler_created] = + per_group_schedulers_.try_emplace(priority.send_group_id); + if (scheduler_created) { + // First element in the associated group; register the group in question. + QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {})); + } + + PerGroupScheduler& scheduler = scheduler_it->second; + QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order)); + + it->second = &*scheduler_it; + std::move(cleanup_nullptr_map_entry).Cancel(); + return absl::OkStatus(); +} + +absl::Status PriorityScheduler::Unregister(StreamId stream_id) { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return absl::NotFoundError("Stream ID not registered"); + } + SendGroupId group_id = it->second->first; + PerGroupScheduler* group_scheduler = &it->second->second; + stream_to_group_map_.erase(it); + + QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id)); + // Clean up the group if there are no more streams associated with it. + if (!group_scheduler->HasRegistered()) { + per_group_schedulers_.erase(group_id); + QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id)); + } + return absl::OkStatus(); +} + +absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id, + SendOrder new_send_order) { + PerGroupScheduler* scheduler = SchedulerForStream(stream_id); + if (scheduler == nullptr) { + return absl::NotFoundError("Stream ID not registered"); + } + return scheduler->UpdatePriority(stream_id, new_send_order); +} + +absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id, + SendGroupId new_send_group) { + PerGroupScheduler* scheduler = SchedulerForStream(stream_id); + if (scheduler == nullptr) { + return absl::NotFoundError("Stream ID not registered"); + } + bool is_scheduled = scheduler->IsScheduled(stream_id); + std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id); + if (!send_order.has_value()) { + return absl::InternalError( + "Stream registered at the top level scheduler, but not at the " + "per-group one"); + } + QUICHE_RETURN_IF_ERROR(Unregister(stream_id)); + QUICHE_RETURN_IF_ERROR( + Register(stream_id, StreamPriority{new_send_group, *send_order})); + if (is_scheduled) { + QUICHE_RETURN_IF_ERROR(Schedule(stream_id)); + } + return absl::OkStatus(); +} + +std::optional<StreamPriority> PriorityScheduler::GetPriorityFor( + StreamId stream_id) const { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return std::nullopt; + } + const auto& [group_id, group_scheduler] = *it->second; + std::optional<SendOrder> send_order = + group_scheduler.GetPriorityFor(stream_id); + if (!send_order.has_value()) { + return std::nullopt; + } + return StreamPriority{group_id, *send_order}; +} + +absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return absl::NotFoundError("Stream ID not registered"); + } + const auto& [group_id, group_scheduler] = *it->second; + + absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id); + QUICHE_RETURN_IF_ERROR(per_group_result.status()); + if (*per_group_result) { + return true; + } + + return group_scheduler.ShouldYield(stream_id); +} + +absl::StatusOr<StreamId> PriorityScheduler::PopFront() { + absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront(); + QUICHE_RETURN_IF_ERROR(group_id.status()); + + auto it = per_group_schedulers_.find(*group_id); + if (it == per_group_schedulers_.end()) { + return absl::InternalError( + "Scheduled a group with no per-group scheduler attached"); + } + PerGroupScheduler& scheduler = it->second; + absl::StatusOr<StreamId> result = scheduler.PopFront(); + if (!result.ok()) { + return absl::InternalError("Inactive group found in top-level schedule"); + } + + // Reschedule the group if it has more active streams in it. + if (scheduler.HasScheduled()) { + QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id)); + } + + return result; +} + +absl::Status PriorityScheduler::Schedule(StreamId stream_id) { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return absl::NotFoundError("Stream ID not registered"); + } + auto& [group_id, group_scheduler] = *it->second; + QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id)); + return group_scheduler.Schedule(stream_id); +} + +bool PriorityScheduler::IsScheduled(StreamId stream_id) const { + const PerGroupScheduler* scheduler = SchedulerForStream(stream_id); + if (scheduler == nullptr) { + return false; + } + return scheduler->IsScheduled(stream_id); +} + +} // namespace webtransport
diff --git a/quiche/web_transport/web_transport_priority_scheduler.h b/quiche/web_transport/web_transport_priority_scheduler.h new file mode 100644 index 0000000..7e71ed0 --- /dev/null +++ b/quiche/web_transport/web_transport_priority_scheduler.h
@@ -0,0 +1,109 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_ +#define QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_ + +#include <cstddef> +#include <optional> +#include <utility> + +#include "absl/container/flat_hash_map.h" +#include "absl/container/node_hash_map.h" +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "quiche/common/btree_scheduler.h" +#include "quiche/common/platform/api/quiche_export.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { + +// Schedules the streams within a WebTransport session as defined by the W3C +// WebTransport API. Unlike the W3C API, there is no need to track groups +// manually: the group is created when a first stream with the associated group +// ID is registered, and it is deleted when the last stream associated with the +// group is unregistered. +class QUICHE_EXPORT PriorityScheduler { + public: + // Returns true if there are any streams registered. + bool HasRegistered() const { return active_groups_.HasRegistered(); } + // Returns true if there are any streams scheduled. + bool HasScheduled() const { return active_groups_.HasScheduled(); } + + // Returns the number of currently scheduled streams. + size_t NumScheduled() const { + size_t total = 0; + for (const auto& [group_id, group_scheduler] : per_group_schedulers_) { + total += group_scheduler.NumScheduled(); + } + return total; + } + + // Registers the specified stream with the supplied priority. The stream must + // not be already registered. + absl::Status Register(StreamId stream_id, const StreamPriority& priority); + // Unregisters a previously registered stream. + absl::Status Unregister(StreamId stream_id); + // Alters the priority of an already registered stream. + absl::Status UpdateSendOrder(StreamId stream_id, SendOrder new_send_order); + absl::Status UpdateSendGroup(StreamId stream_id, SendGroupId new_send_group); + + // Returns true if there is a stream that would go before `id` in the + // schedule. + absl::StatusOr<bool> ShouldYield(StreamId id) const; + + // Returns the priority for `id`, or nullopt if stream is not registered. + std::optional<StreamPriority> GetPriorityFor(StreamId stream_id) const; + + // Pops the highest priority stream. Will fail if the schedule is empty. + absl::StatusOr<StreamId> PopFront(); + + // Adds `stream` to the schedule if it's not already there. + absl::Status Schedule(StreamId stream_id); + // Returns true if `stream` is in the schedule. + bool IsScheduled(StreamId stream_id) const; + + private: + // All groups currently have the equal priority; this type represents the said + // single priority. + class SinglePriority { + public: + bool operator==(const SinglePriority&) const { return true; } + bool operator!=(const SinglePriority&) const { return false; } + + bool operator<(const SinglePriority&) const { return false; } + bool operator>(const SinglePriority&) const { return false; } + bool operator<=(const SinglePriority&) const { return true; } + bool operator>=(const SinglePriority&) const { return true; } + }; + + using PerGroupScheduler = quiche::BTreeScheduler<StreamId, SendOrder>; + using GroupSchedulerPair = std::pair<const SendGroupId, PerGroupScheduler>; + + // Round-robin schedule for the groups. + quiche::BTreeScheduler<SendGroupId, SinglePriority> active_groups_; + // Map group ID to the scheduler for the group in question. + absl::node_hash_map<SendGroupId, PerGroupScheduler> per_group_schedulers_; + // Map stream ID to a pointer to the entry in `per_group_schedulers_`. + absl::flat_hash_map<StreamId, GroupSchedulerPair*> stream_to_group_map_; + + PerGroupScheduler* SchedulerForStream(StreamId stream_id) { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return nullptr; + } + return &it->second->second; + } + const PerGroupScheduler* SchedulerForStream(StreamId stream_id) const { + auto it = stream_to_group_map_.find(stream_id); + if (it == stream_to_group_map_.end()) { + return nullptr; + } + return &it->second->second; + } +}; + +} // namespace webtransport + +#endif // QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
diff --git a/quiche/web_transport/web_transport_priority_scheduler_test.cc b/quiche/web_transport/web_transport_priority_scheduler_test.cc new file mode 100644 index 0000000..d89a52d --- /dev/null +++ b/quiche/web_transport/web_transport_priority_scheduler_test.cc
@@ -0,0 +1,260 @@ +// Copyright 2023 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "quiche/web_transport/web_transport_priority_scheduler.h" + +#include <optional> +#include <vector> + +#include "absl/status/status.h" +#include "absl/status/statusor.h" +#include "absl/types/span.h" +#include "quiche/common/platform/api/quiche_test.h" +#include "quiche/common/test_tools/quiche_test_utils.h" +#include "quiche/web_transport/web_transport.h" + +namespace webtransport { +namespace { + +using ::quiche::test::IsOkAndHolds; +using ::quiche::test::StatusIs; +using ::testing::ElementsAre; + +void ScheduleIds(PriorityScheduler& scheduler, absl::Span<const StreamId> ids) { + for (StreamId id : ids) { + QUICHE_EXPECT_OK(scheduler.Schedule(id)); + } +} + +std::vector<StreamId> PopAll(PriorityScheduler& scheduler) { + std::vector<StreamId> result; + result.reserve(scheduler.NumScheduled()); + for (;;) { + absl::StatusOr<StreamId> id = scheduler.PopFront(); + if (!id.ok()) { + EXPECT_THAT(id, StatusIs(absl::StatusCode::kNotFound)); + break; + } + result.push_back(*id); + } + return result; +} + +TEST(WebTransportSchedulerTest, Register) { + PriorityScheduler scheduler; + + // Register two streams in the same group. + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0})); + QUICHE_EXPECT_OK(scheduler.Register(4, StreamPriority{0, 0})); + + // Attempt re-registering. + EXPECT_THAT(scheduler.Register(4, StreamPriority{0, 0}), + StatusIs(absl::StatusCode::kAlreadyExists)); + EXPECT_THAT(scheduler.Register(4, StreamPriority{1, 0}), + StatusIs(absl::StatusCode::kAlreadyExists)); +} + +TEST(WebTransportSchedulerTest, Unregister) { + PriorityScheduler scheduler; + + EXPECT_FALSE(scheduler.HasRegistered()); + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + + EXPECT_TRUE(scheduler.HasRegistered()); + QUICHE_EXPECT_OK(scheduler.Unregister(1)); + EXPECT_TRUE(scheduler.HasRegistered()); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + + ScheduleIds(scheduler, {0, 1}); + QUICHE_EXPECT_OK(scheduler.Unregister(0)); + QUICHE_EXPECT_OK(scheduler.Unregister(1)); + EXPECT_FALSE(scheduler.HasRegistered()); + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + EXPECT_TRUE(scheduler.HasRegistered()); + EXPECT_FALSE(scheduler.HasScheduled()); +} + +TEST(WebTransportSchedulerTest, UpdatePriority) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 10})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 20})); + EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{0, 10})); + EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 20})); + + QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(0, 1)); + QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 40)); + EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{1, 10})); + EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 40})); + + EXPECT_THAT(scheduler.UpdateSendGroup(1000, 1), + StatusIs(absl::StatusCode::kNotFound)); + EXPECT_THAT(scheduler.UpdateSendOrder(1000, 1), + StatusIs(absl::StatusCode::kNotFound)); + EXPECT_EQ(scheduler.GetPriorityFor(1000), std::nullopt); +} + +TEST(WebTransportSchedulerTest, Schedule) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + + EXPECT_FALSE(scheduler.IsScheduled(0)); + EXPECT_FALSE(scheduler.IsScheduled(1)); + EXPECT_FALSE(scheduler.IsScheduled(1000)); + + QUICHE_EXPECT_OK(scheduler.Schedule(0)); + EXPECT_TRUE(scheduler.IsScheduled(0)); + EXPECT_FALSE(scheduler.IsScheduled(1)); + + QUICHE_EXPECT_OK(scheduler.Schedule(1)); + EXPECT_TRUE(scheduler.IsScheduled(0)); + EXPECT_TRUE(scheduler.IsScheduled(1)); + + EXPECT_THAT(scheduler.Schedule(0), StatusIs(absl::StatusCode::kOk)); + EXPECT_THAT(scheduler.Schedule(2), StatusIs(absl::StatusCode::kNotFound)); +} + +TEST(WebTransportSchedulerTest, SamePriority) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 0})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_EQ(scheduler.NumScheduled(), 4); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3)); + ScheduleIds(scheduler, {3, 1, 2}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2)); +} + +TEST(WebTransportSchedulerTest, SingleBucketOrdered) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 1})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 2})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 3})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0)); + ScheduleIds(scheduler, {3, 1, 2, 0}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0)); +} + +TEST(WebTransportSchedulerTest, EveryStreamInItsOwnBucket) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{1, 1})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{2, 2})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{3, 3})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3)); + ScheduleIds(scheduler, {3, 1, 2}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2)); +} + +TEST(WebTransportSchedulerTest, TwoBucketsNoSendOrder) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3)); + ScheduleIds(scheduler, {0, 2, 1, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3)); + ScheduleIds(scheduler, {3, 2, 1, 0}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0)); + + ScheduleIds(scheduler, {0, 2}); + EXPECT_THAT(scheduler.PopFront(), IsOkAndHolds(0)); + ScheduleIds(scheduler, {1, 3, 0}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(2, 1, 3, 0)); +} + +TEST(WebTransportSchedulerTest, TwoBucketsWithSendOrder) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 10})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 20})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 30})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 3, 0, 2)); + ScheduleIds(scheduler, {3, 2, 1, 0}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0)); +} + +TEST(WebTransportSchedulerTest, ShouldYield) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 10})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0})); + + EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(4), StatusIs(absl::StatusCode::kNotFound)); + + QUICHE_EXPECT_OK(scheduler.Schedule(0)); + EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true)); + PopAll(scheduler); + + QUICHE_EXPECT_OK(scheduler.Schedule(2)); + EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false)); + EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true)); + PopAll(scheduler); + + QUICHE_EXPECT_OK(scheduler.Schedule(3)); + EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(true)); + EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false)); + PopAll(scheduler); +} + +TEST(WebTransportSchedulerTest, UpdatePriorityWhileScheduled) { + PriorityScheduler scheduler; + + QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0})); + QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0})); + QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0})); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3)); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 10)); + EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 2, 0, 3)); + + ScheduleIds(scheduler, {0, 1, 2, 3}); + QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(1, 1)); + EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3)); +} + +} // namespace +} // namespace webtransport