Implement WebTransport session priority scheduler.
PiperOrigin-RevId: 596645681
diff --git a/build/source_list.bzl b/build/source_list.bzl
index fd8192c..dcdedb9 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -394,6 +394,7 @@
"web_transport/encapsulated/encapsulated_web_transport.h",
"web_transport/web_transport.h",
"web_transport/web_transport_headers.h",
+ "web_transport/web_transport_priority_scheduler.h",
]
quiche_core_srcs = [
"common/capsule.cc",
@@ -683,6 +684,7 @@
"web_transport/complete_buffer_visitor.cc",
"web_transport/encapsulated/encapsulated_web_transport.cc",
"web_transport/web_transport_headers.cc",
+ "web_transport/web_transport_priority_scheduler.cc",
]
quiche_tool_support_hdrs = [
"common/platform/api/quiche_command_line_flags.h",
@@ -1311,6 +1313,7 @@
"spdy/core/spdy_protocol_test.cc",
"web_transport/encapsulated/encapsulated_web_transport_test.cc",
"web_transport/web_transport_headers_test.cc",
+ "web_transport/web_transport_priority_scheduler_test.cc",
]
io_tests_hdrs = [
]
diff --git a/build/source_list.gni b/build/source_list.gni
index d3fb7a0..7d44368 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -394,6 +394,7 @@
"src/quiche/web_transport/encapsulated/encapsulated_web_transport.h",
"src/quiche/web_transport/web_transport.h",
"src/quiche/web_transport/web_transport_headers.h",
+ "src/quiche/web_transport/web_transport_priority_scheduler.h",
]
quiche_core_srcs = [
"src/quiche/common/capsule.cc",
@@ -683,6 +684,7 @@
"src/quiche/web_transport/complete_buffer_visitor.cc",
"src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
"src/quiche/web_transport/web_transport_headers.cc",
+ "src/quiche/web_transport/web_transport_priority_scheduler.cc",
]
quiche_tool_support_hdrs = [
"src/quiche/common/platform/api/quiche_command_line_flags.h",
@@ -1312,6 +1314,7 @@
"src/quiche/spdy/core/spdy_protocol_test.cc",
"src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
"src/quiche/web_transport/web_transport_headers_test.cc",
+ "src/quiche/web_transport/web_transport_priority_scheduler_test.cc",
]
io_tests_hdrs = [
diff --git a/build/source_list.json b/build/source_list.json
index 69eeccd..967ddc8 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -392,7 +392,8 @@
"quiche/web_transport/complete_buffer_visitor.h",
"quiche/web_transport/encapsulated/encapsulated_web_transport.h",
"quiche/web_transport/web_transport.h",
- "quiche/web_transport/web_transport_headers.h"
+ "quiche/web_transport/web_transport_headers.h",
+ "quiche/web_transport/web_transport_priority_scheduler.h"
],
"quiche_core_srcs": [
"quiche/common/capsule.cc",
@@ -681,7 +682,8 @@
"quiche/spdy/core/spdy_protocol.cc",
"quiche/web_transport/complete_buffer_visitor.cc",
"quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
- "quiche/web_transport/web_transport_headers.cc"
+ "quiche/web_transport/web_transport_headers.cc",
+ "quiche/web_transport/web_transport_priority_scheduler.cc"
],
"quiche_tool_support_hdrs": [
"quiche/common/platform/api/quiche_command_line_flags.h",
@@ -1310,7 +1312,8 @@
"quiche/spdy/core/spdy_framer_test.cc",
"quiche/spdy/core/spdy_protocol_test.cc",
"quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
- "quiche/web_transport/web_transport_headers_test.cc"
+ "quiche/web_transport/web_transport_headers_test.cc",
+ "quiche/web_transport/web_transport_priority_scheduler_test.cc"
],
"io_tests_hdrs": [
diff --git a/quiche/common/btree_scheduler.h b/quiche/common/btree_scheduler.h
index 0f160c8..d2bb765 100644
--- a/quiche/common/btree_scheduler.h
+++ b/quiche/common/btree_scheduler.h
@@ -5,15 +5,19 @@
#ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_
#define QUICHE_COMMON_BTREE_SCHEDULER_H_
+#include <cstddef>
#include <limits>
#include <optional>
+#include <utility>
+#include "absl/base/attributes.h"
#include "absl/container/btree_map.h"
#include "absl/container/node_hash_map.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_logging.h"
namespace quiche {
@@ -39,6 +43,8 @@
template <typename Id, typename Priority>
class QUICHE_EXPORT BTreeScheduler {
public:
+ // Returns true if there are any streams registered.
+ bool HasRegistered() const { return !streams_.empty(); }
// Returns true if there are any streams scheduled.
bool HasScheduled() const { return !schedule_.empty(); }
// Returns the number of currently scheduled streams.
@@ -82,7 +88,7 @@
// A record for a registered stream.
struct StreamEntry {
// The current priority of the stream.
- Priority priority;
+ ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
// If present, the sequence number with which the stream is currently
// scheduled. If absent, indicates that the stream is not scheduled.
std::optional<int> current_sequence_number;
@@ -96,7 +102,7 @@
// A key that is used to order entities within the schedule.
struct ScheduleKey {
// The main order key: the priority of the stream.
- Priority priority;
+ ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
// The secondary order key: the sequence number.
int sequence_number;
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index dbf22f8..c008801 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -37,6 +37,25 @@
// Application-specific error code used for closing a WebTransport session.
using SessionErrorCode = uint32_t;
+// WebTransport priority as defined in
+// https://w3c.github.io/webtransport/#webtransportsendstream-write
+// The rules are as follows:
+// - Streams with the same priority are handled in FIFO order.
+// - Streams with the same group_id but different send_order are handled
+// strictly in order.
+// - Different group_ids are handled in the FIFO order.
+using SendGroupId = uint32_t;
+using SendOrder = int64_t;
+struct QUICHE_EXPORT StreamPriority {
+ SendGroupId send_group_id = 0;
+ SendOrder send_order = 0;
+
+ bool operator==(const StreamPriority& other) const {
+ return send_group_id == other.send_group_id &&
+ send_order == other.send_order;
+ }
+};
+
// An outcome of a datagram send call.
enum class DatagramStatusCode {
// Datagram has been successfully sent or placed into the datagram queue.
diff --git a/quiche/web_transport/web_transport_priority_scheduler.cc b/quiche/web_transport/web_transport_priority_scheduler.cc
new file mode 100644
index 0000000..9387b64
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler.cc
@@ -0,0 +1,166 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/web_transport_priority_scheduler.h"
+
+#include <optional>
+#include <utility>
+
+#include "absl/cleanup/cleanup.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/common/quiche_status_utils.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+absl::Status PriorityScheduler::Register(StreamId stream_id,
+ const StreamPriority& priority) {
+ auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr});
+ if (!success) {
+ return absl::AlreadyExistsError("Provided stream ID already registered");
+ }
+ // Avoid having any nullptr entries in the stream map if we error out further
+ // down below. This should not happen (all errors below are logical errors),
+ // but if that does happen, we will avoid crashing due to nullptr dereference.
+ auto cleanup_nullptr_map_entry =
+ absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); });
+
+ auto [scheduler_it, scheduler_created] =
+ per_group_schedulers_.try_emplace(priority.send_group_id);
+ if (scheduler_created) {
+ // First element in the associated group; register the group in question.
+ QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {}));
+ }
+
+ PerGroupScheduler& scheduler = scheduler_it->second;
+ QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order));
+
+ it->second = &*scheduler_it;
+ std::move(cleanup_nullptr_map_entry).Cancel();
+ return absl::OkStatus();
+}
+
+absl::Status PriorityScheduler::Unregister(StreamId stream_id) {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return absl::NotFoundError("Stream ID not registered");
+ }
+ SendGroupId group_id = it->second->first;
+ PerGroupScheduler* group_scheduler = &it->second->second;
+ stream_to_group_map_.erase(it);
+
+ QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id));
+ // Clean up the group if there are no more streams associated with it.
+ if (!group_scheduler->HasRegistered()) {
+ per_group_schedulers_.erase(group_id);
+ QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id));
+ }
+ return absl::OkStatus();
+}
+
+absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id,
+ SendOrder new_send_order) {
+ PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+ if (scheduler == nullptr) {
+ return absl::NotFoundError("Stream ID not registered");
+ }
+ return scheduler->UpdatePriority(stream_id, new_send_order);
+}
+
+absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id,
+ SendGroupId new_send_group) {
+ PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+ if (scheduler == nullptr) {
+ return absl::NotFoundError("Stream ID not registered");
+ }
+ bool is_scheduled = scheduler->IsScheduled(stream_id);
+ std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id);
+ if (!send_order.has_value()) {
+ return absl::InternalError(
+ "Stream registered at the top level scheduler, but not at the "
+ "per-group one");
+ }
+ QUICHE_RETURN_IF_ERROR(Unregister(stream_id));
+ QUICHE_RETURN_IF_ERROR(
+ Register(stream_id, StreamPriority{new_send_group, *send_order}));
+ if (is_scheduled) {
+ QUICHE_RETURN_IF_ERROR(Schedule(stream_id));
+ }
+ return absl::OkStatus();
+}
+
+std::optional<StreamPriority> PriorityScheduler::GetPriorityFor(
+ StreamId stream_id) const {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return std::nullopt;
+ }
+ const auto& [group_id, group_scheduler] = *it->second;
+ std::optional<SendOrder> send_order =
+ group_scheduler.GetPriorityFor(stream_id);
+ if (!send_order.has_value()) {
+ return std::nullopt;
+ }
+ return StreamPriority{group_id, *send_order};
+}
+
+absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return absl::NotFoundError("Stream ID not registered");
+ }
+ const auto& [group_id, group_scheduler] = *it->second;
+
+ absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id);
+ QUICHE_RETURN_IF_ERROR(per_group_result.status());
+ if (*per_group_result) {
+ return true;
+ }
+
+ return group_scheduler.ShouldYield(stream_id);
+}
+
+absl::StatusOr<StreamId> PriorityScheduler::PopFront() {
+ absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront();
+ QUICHE_RETURN_IF_ERROR(group_id.status());
+
+ auto it = per_group_schedulers_.find(*group_id);
+ if (it == per_group_schedulers_.end()) {
+ return absl::InternalError(
+ "Scheduled a group with no per-group scheduler attached");
+ }
+ PerGroupScheduler& scheduler = it->second;
+ absl::StatusOr<StreamId> result = scheduler.PopFront();
+ if (!result.ok()) {
+ return absl::InternalError("Inactive group found in top-level schedule");
+ }
+
+ // Reschedule the group if it has more active streams in it.
+ if (scheduler.HasScheduled()) {
+ QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id));
+ }
+
+ return result;
+}
+
+absl::Status PriorityScheduler::Schedule(StreamId stream_id) {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return absl::NotFoundError("Stream ID not registered");
+ }
+ auto& [group_id, group_scheduler] = *it->second;
+ QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id));
+ return group_scheduler.Schedule(stream_id);
+}
+
+bool PriorityScheduler::IsScheduled(StreamId stream_id) const {
+ const PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+ if (scheduler == nullptr) {
+ return false;
+ }
+ return scheduler->IsScheduled(stream_id);
+}
+
+} // namespace webtransport
diff --git a/quiche/web_transport/web_transport_priority_scheduler.h b/quiche/web_transport/web_transport_priority_scheduler.h
new file mode 100644
index 0000000..7e71ed0
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler.h
@@ -0,0 +1,109 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
+#define QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
+
+#include <cstddef>
+#include <optional>
+#include <utility>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/container/node_hash_map.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/common/btree_scheduler.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+// Schedules the streams within a WebTransport session as defined by the W3C
+// WebTransport API. Unlike the W3C API, there is no need to track groups
+// manually: the group is created when a first stream with the associated group
+// ID is registered, and it is deleted when the last stream associated with the
+// group is unregistered.
+class QUICHE_EXPORT PriorityScheduler {
+ public:
+ // Returns true if there are any streams registered.
+ bool HasRegistered() const { return active_groups_.HasRegistered(); }
+ // Returns true if there are any streams scheduled.
+ bool HasScheduled() const { return active_groups_.HasScheduled(); }
+
+ // Returns the number of currently scheduled streams.
+ size_t NumScheduled() const {
+ size_t total = 0;
+ for (const auto& [group_id, group_scheduler] : per_group_schedulers_) {
+ total += group_scheduler.NumScheduled();
+ }
+ return total;
+ }
+
+ // Registers the specified stream with the supplied priority. The stream must
+ // not be already registered.
+ absl::Status Register(StreamId stream_id, const StreamPriority& priority);
+ // Unregisters a previously registered stream.
+ absl::Status Unregister(StreamId stream_id);
+ // Alters the priority of an already registered stream.
+ absl::Status UpdateSendOrder(StreamId stream_id, SendOrder new_send_order);
+ absl::Status UpdateSendGroup(StreamId stream_id, SendGroupId new_send_group);
+
+ // Returns true if there is a stream that would go before `id` in the
+ // schedule.
+ absl::StatusOr<bool> ShouldYield(StreamId id) const;
+
+ // Returns the priority for `id`, or nullopt if stream is not registered.
+ std::optional<StreamPriority> GetPriorityFor(StreamId stream_id) const;
+
+ // Pops the highest priority stream. Will fail if the schedule is empty.
+ absl::StatusOr<StreamId> PopFront();
+
+ // Adds `stream` to the schedule if it's not already there.
+ absl::Status Schedule(StreamId stream_id);
+ // Returns true if `stream` is in the schedule.
+ bool IsScheduled(StreamId stream_id) const;
+
+ private:
+ // All groups currently have the equal priority; this type represents the said
+ // single priority.
+ class SinglePriority {
+ public:
+ bool operator==(const SinglePriority&) const { return true; }
+ bool operator!=(const SinglePriority&) const { return false; }
+
+ bool operator<(const SinglePriority&) const { return false; }
+ bool operator>(const SinglePriority&) const { return false; }
+ bool operator<=(const SinglePriority&) const { return true; }
+ bool operator>=(const SinglePriority&) const { return true; }
+ };
+
+ using PerGroupScheduler = quiche::BTreeScheduler<StreamId, SendOrder>;
+ using GroupSchedulerPair = std::pair<const SendGroupId, PerGroupScheduler>;
+
+ // Round-robin schedule for the groups.
+ quiche::BTreeScheduler<SendGroupId, SinglePriority> active_groups_;
+ // Map group ID to the scheduler for the group in question.
+ absl::node_hash_map<SendGroupId, PerGroupScheduler> per_group_schedulers_;
+ // Map stream ID to a pointer to the entry in `per_group_schedulers_`.
+ absl::flat_hash_map<StreamId, GroupSchedulerPair*> stream_to_group_map_;
+
+ PerGroupScheduler* SchedulerForStream(StreamId stream_id) {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return nullptr;
+ }
+ return &it->second->second;
+ }
+ const PerGroupScheduler* SchedulerForStream(StreamId stream_id) const {
+ auto it = stream_to_group_map_.find(stream_id);
+ if (it == stream_to_group_map_.end()) {
+ return nullptr;
+ }
+ return &it->second->second;
+ }
+};
+
+} // namespace webtransport
+
+#endif // QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
diff --git a/quiche/web_transport/web_transport_priority_scheduler_test.cc b/quiche/web_transport/web_transport_priority_scheduler_test.cc
new file mode 100644
index 0000000..d89a52d
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler_test.cc
@@ -0,0 +1,260 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/web_transport_priority_scheduler.h"
+
+#include <optional>
+#include <vector>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+namespace {
+
+using ::quiche::test::IsOkAndHolds;
+using ::quiche::test::StatusIs;
+using ::testing::ElementsAre;
+
+void ScheduleIds(PriorityScheduler& scheduler, absl::Span<const StreamId> ids) {
+ for (StreamId id : ids) {
+ QUICHE_EXPECT_OK(scheduler.Schedule(id));
+ }
+}
+
+std::vector<StreamId> PopAll(PriorityScheduler& scheduler) {
+ std::vector<StreamId> result;
+ result.reserve(scheduler.NumScheduled());
+ for (;;) {
+ absl::StatusOr<StreamId> id = scheduler.PopFront();
+ if (!id.ok()) {
+ EXPECT_THAT(id, StatusIs(absl::StatusCode::kNotFound));
+ break;
+ }
+ result.push_back(*id);
+ }
+ return result;
+}
+
+TEST(WebTransportSchedulerTest, Register) {
+ PriorityScheduler scheduler;
+
+ // Register two streams in the same group.
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(4, StreamPriority{0, 0}));
+
+ // Attempt re-registering.
+ EXPECT_THAT(scheduler.Register(4, StreamPriority{0, 0}),
+ StatusIs(absl::StatusCode::kAlreadyExists));
+ EXPECT_THAT(scheduler.Register(4, StreamPriority{1, 0}),
+ StatusIs(absl::StatusCode::kAlreadyExists));
+}
+
+TEST(WebTransportSchedulerTest, Unregister) {
+ PriorityScheduler scheduler;
+
+ EXPECT_FALSE(scheduler.HasRegistered());
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+ EXPECT_TRUE(scheduler.HasRegistered());
+ QUICHE_EXPECT_OK(scheduler.Unregister(1));
+ EXPECT_TRUE(scheduler.HasRegistered());
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+ ScheduleIds(scheduler, {0, 1});
+ QUICHE_EXPECT_OK(scheduler.Unregister(0));
+ QUICHE_EXPECT_OK(scheduler.Unregister(1));
+ EXPECT_FALSE(scheduler.HasRegistered());
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ EXPECT_TRUE(scheduler.HasRegistered());
+ EXPECT_FALSE(scheduler.HasScheduled());
+}
+
+TEST(WebTransportSchedulerTest, UpdatePriority) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 10}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 20}));
+ EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{0, 10}));
+ EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 20}));
+
+ QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(0, 1));
+ QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 40));
+ EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{1, 10}));
+ EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 40}));
+
+ EXPECT_THAT(scheduler.UpdateSendGroup(1000, 1),
+ StatusIs(absl::StatusCode::kNotFound));
+ EXPECT_THAT(scheduler.UpdateSendOrder(1000, 1),
+ StatusIs(absl::StatusCode::kNotFound));
+ EXPECT_EQ(scheduler.GetPriorityFor(1000), std::nullopt);
+}
+
+TEST(WebTransportSchedulerTest, Schedule) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+ EXPECT_FALSE(scheduler.IsScheduled(0));
+ EXPECT_FALSE(scheduler.IsScheduled(1));
+ EXPECT_FALSE(scheduler.IsScheduled(1000));
+
+ QUICHE_EXPECT_OK(scheduler.Schedule(0));
+ EXPECT_TRUE(scheduler.IsScheduled(0));
+ EXPECT_FALSE(scheduler.IsScheduled(1));
+
+ QUICHE_EXPECT_OK(scheduler.Schedule(1));
+ EXPECT_TRUE(scheduler.IsScheduled(0));
+ EXPECT_TRUE(scheduler.IsScheduled(1));
+
+ EXPECT_THAT(scheduler.Schedule(0), StatusIs(absl::StatusCode::kOk));
+ EXPECT_THAT(scheduler.Schedule(2), StatusIs(absl::StatusCode::kNotFound));
+}
+
+TEST(WebTransportSchedulerTest, SamePriority) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 0}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_EQ(scheduler.NumScheduled(), 4);
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+ ScheduleIds(scheduler, {3, 1, 2});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2));
+}
+
+TEST(WebTransportSchedulerTest, SingleBucketOrdered) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 1}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 2}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 3}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0));
+ ScheduleIds(scheduler, {3, 1, 2, 0});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0));
+}
+
+TEST(WebTransportSchedulerTest, EveryStreamInItsOwnBucket) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{1, 1}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{2, 2}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{3, 3}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+ ScheduleIds(scheduler, {3, 1, 2});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2));
+}
+
+TEST(WebTransportSchedulerTest, TwoBucketsNoSendOrder) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+ ScheduleIds(scheduler, {0, 2, 1, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+ ScheduleIds(scheduler, {3, 2, 1, 0});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0));
+
+ ScheduleIds(scheduler, {0, 2});
+ EXPECT_THAT(scheduler.PopFront(), IsOkAndHolds(0));
+ ScheduleIds(scheduler, {1, 3, 0});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(2, 1, 3, 0));
+}
+
+TEST(WebTransportSchedulerTest, TwoBucketsWithSendOrder) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 10}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 20}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 30}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 3, 0, 2));
+ ScheduleIds(scheduler, {3, 2, 1, 0});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0));
+}
+
+TEST(WebTransportSchedulerTest, ShouldYield) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 10}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+ EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(4), StatusIs(absl::StatusCode::kNotFound));
+
+ QUICHE_EXPECT_OK(scheduler.Schedule(0));
+ EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true));
+ PopAll(scheduler);
+
+ QUICHE_EXPECT_OK(scheduler.Schedule(2));
+ EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+ EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true));
+ PopAll(scheduler);
+
+ QUICHE_EXPECT_OK(scheduler.Schedule(3));
+ EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(true));
+ EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false));
+ PopAll(scheduler);
+}
+
+TEST(WebTransportSchedulerTest, UpdatePriorityWhileScheduled) {
+ PriorityScheduler scheduler;
+
+ QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+ QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 10));
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 2, 0, 3));
+
+ ScheduleIds(scheduler, {0, 1, 2, 3});
+ QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(1, 1));
+ EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+}
+
+} // namespace
+} // namespace webtransport