Implement WebTransport session priority scheduler.

PiperOrigin-RevId: 596645681
diff --git a/build/source_list.bzl b/build/source_list.bzl
index fd8192c..dcdedb9 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -394,6 +394,7 @@
     "web_transport/encapsulated/encapsulated_web_transport.h",
     "web_transport/web_transport.h",
     "web_transport/web_transport_headers.h",
+    "web_transport/web_transport_priority_scheduler.h",
 ]
 quiche_core_srcs = [
     "common/capsule.cc",
@@ -683,6 +684,7 @@
     "web_transport/complete_buffer_visitor.cc",
     "web_transport/encapsulated/encapsulated_web_transport.cc",
     "web_transport/web_transport_headers.cc",
+    "web_transport/web_transport_priority_scheduler.cc",
 ]
 quiche_tool_support_hdrs = [
     "common/platform/api/quiche_command_line_flags.h",
@@ -1311,6 +1313,7 @@
     "spdy/core/spdy_protocol_test.cc",
     "web_transport/encapsulated/encapsulated_web_transport_test.cc",
     "web_transport/web_transport_headers_test.cc",
+    "web_transport/web_transport_priority_scheduler_test.cc",
 ]
 io_tests_hdrs = [
 ]
diff --git a/build/source_list.gni b/build/source_list.gni
index d3fb7a0..7d44368 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -394,6 +394,7 @@
     "src/quiche/web_transport/encapsulated/encapsulated_web_transport.h",
     "src/quiche/web_transport/web_transport.h",
     "src/quiche/web_transport/web_transport_headers.h",
+    "src/quiche/web_transport/web_transport_priority_scheduler.h",
 ]
 quiche_core_srcs = [
     "src/quiche/common/capsule.cc",
@@ -683,6 +684,7 @@
     "src/quiche/web_transport/complete_buffer_visitor.cc",
     "src/quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
     "src/quiche/web_transport/web_transport_headers.cc",
+    "src/quiche/web_transport/web_transport_priority_scheduler.cc",
 ]
 quiche_tool_support_hdrs = [
     "src/quiche/common/platform/api/quiche_command_line_flags.h",
@@ -1312,6 +1314,7 @@
     "src/quiche/spdy/core/spdy_protocol_test.cc",
     "src/quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
     "src/quiche/web_transport/web_transport_headers_test.cc",
+    "src/quiche/web_transport/web_transport_priority_scheduler_test.cc",
 ]
 io_tests_hdrs = [
 
diff --git a/build/source_list.json b/build/source_list.json
index 69eeccd..967ddc8 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -392,7 +392,8 @@
     "quiche/web_transport/complete_buffer_visitor.h",
     "quiche/web_transport/encapsulated/encapsulated_web_transport.h",
     "quiche/web_transport/web_transport.h",
-    "quiche/web_transport/web_transport_headers.h"
+    "quiche/web_transport/web_transport_headers.h",
+    "quiche/web_transport/web_transport_priority_scheduler.h"
   ],
   "quiche_core_srcs": [
     "quiche/common/capsule.cc",
@@ -681,7 +682,8 @@
     "quiche/spdy/core/spdy_protocol.cc",
     "quiche/web_transport/complete_buffer_visitor.cc",
     "quiche/web_transport/encapsulated/encapsulated_web_transport.cc",
-    "quiche/web_transport/web_transport_headers.cc"
+    "quiche/web_transport/web_transport_headers.cc",
+    "quiche/web_transport/web_transport_priority_scheduler.cc"
   ],
   "quiche_tool_support_hdrs": [
     "quiche/common/platform/api/quiche_command_line_flags.h",
@@ -1310,7 +1312,8 @@
     "quiche/spdy/core/spdy_framer_test.cc",
     "quiche/spdy/core/spdy_protocol_test.cc",
     "quiche/web_transport/encapsulated/encapsulated_web_transport_test.cc",
-    "quiche/web_transport/web_transport_headers_test.cc"
+    "quiche/web_transport/web_transport_headers_test.cc",
+    "quiche/web_transport/web_transport_priority_scheduler_test.cc"
   ],
   "io_tests_hdrs": [
 
diff --git a/quiche/common/btree_scheduler.h b/quiche/common/btree_scheduler.h
index 0f160c8..d2bb765 100644
--- a/quiche/common/btree_scheduler.h
+++ b/quiche/common/btree_scheduler.h
@@ -5,15 +5,19 @@
 #ifndef QUICHE_COMMON_BTREE_SCHEDULER_H_
 #define QUICHE_COMMON_BTREE_SCHEDULER_H_
 
+#include <cstddef>
 #include <limits>
 #include <optional>
+#include <utility>
 
+#include "absl/base/attributes.h"
 #include "absl/container/btree_map.h"
 #include "absl/container/node_hash_map.h"
 #include "absl/status/status.h"
 #include "absl/status/statusor.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_logging.h"
 
 namespace quiche {
 
@@ -39,6 +43,8 @@
 template <typename Id, typename Priority>
 class QUICHE_EXPORT BTreeScheduler {
  public:
+  // Returns true if there are any streams registered.
+  bool HasRegistered() const { return !streams_.empty(); }
   // Returns true if there are any streams scheduled.
   bool HasScheduled() const { return !schedule_.empty(); }
   // Returns the number of currently scheduled streams.
@@ -82,7 +88,7 @@
   // A record for a registered stream.
   struct StreamEntry {
     // The current priority of the stream.
-    Priority priority;
+    ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
     // If present, the sequence number with which the stream is currently
     // scheduled.  If absent, indicates that the stream is not scheduled.
     std::optional<int> current_sequence_number;
@@ -96,7 +102,7 @@
   // A key that is used to order entities within the schedule.
   struct ScheduleKey {
     // The main order key: the priority of the stream.
-    Priority priority;
+    ABSL_ATTRIBUTE_NO_UNIQUE_ADDRESS Priority priority;
     // The secondary order key: the sequence number.
     int sequence_number;
 
diff --git a/quiche/web_transport/web_transport.h b/quiche/web_transport/web_transport.h
index dbf22f8..c008801 100644
--- a/quiche/web_transport/web_transport.h
+++ b/quiche/web_transport/web_transport.h
@@ -37,6 +37,25 @@
 // Application-specific error code used for closing a WebTransport session.
 using SessionErrorCode = uint32_t;
 
+// WebTransport priority as defined in
+// https://w3c.github.io/webtransport/#webtransportsendstream-write
+// The rules are as follows:
+// - Streams with the same priority are handled in FIFO order.
+// - Streams with the same group_id but different send_order are handled
+//   strictly in order.
+// - Different group_ids are handled in the FIFO order.
+using SendGroupId = uint32_t;
+using SendOrder = int64_t;
+struct QUICHE_EXPORT StreamPriority {
+  SendGroupId send_group_id = 0;
+  SendOrder send_order = 0;
+
+  bool operator==(const StreamPriority& other) const {
+    return send_group_id == other.send_group_id &&
+           send_order == other.send_order;
+  }
+};
+
 // An outcome of a datagram send call.
 enum class DatagramStatusCode {
   // Datagram has been successfully sent or placed into the datagram queue.
diff --git a/quiche/web_transport/web_transport_priority_scheduler.cc b/quiche/web_transport/web_transport_priority_scheduler.cc
new file mode 100644
index 0000000..9387b64
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler.cc
@@ -0,0 +1,166 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/web_transport_priority_scheduler.h"
+
+#include <optional>
+#include <utility>
+
+#include "absl/cleanup/cleanup.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/common/quiche_status_utils.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+absl::Status PriorityScheduler::Register(StreamId stream_id,
+                                         const StreamPriority& priority) {
+  auto [it, success] = stream_to_group_map_.insert({stream_id, nullptr});
+  if (!success) {
+    return absl::AlreadyExistsError("Provided stream ID already registered");
+  }
+  // Avoid having any nullptr entries in the stream map if we error out further
+  // down below. This should not happen (all errors below are logical errors),
+  // but if that does happen, we will avoid crashing due to nullptr dereference.
+  auto cleanup_nullptr_map_entry =
+      absl::MakeCleanup([&] { stream_to_group_map_.erase(stream_id); });
+
+  auto [scheduler_it, scheduler_created] =
+      per_group_schedulers_.try_emplace(priority.send_group_id);
+  if (scheduler_created) {
+    // First element in the associated group; register the group in question.
+    QUICHE_RETURN_IF_ERROR(active_groups_.Register(priority.send_group_id, {}));
+  }
+
+  PerGroupScheduler& scheduler = scheduler_it->second;
+  QUICHE_RETURN_IF_ERROR(scheduler.Register(stream_id, priority.send_order));
+
+  it->second = &*scheduler_it;
+  std::move(cleanup_nullptr_map_entry).Cancel();
+  return absl::OkStatus();
+}
+
+absl::Status PriorityScheduler::Unregister(StreamId stream_id) {
+  auto it = stream_to_group_map_.find(stream_id);
+  if (it == stream_to_group_map_.end()) {
+    return absl::NotFoundError("Stream ID not registered");
+  }
+  SendGroupId group_id = it->second->first;
+  PerGroupScheduler* group_scheduler = &it->second->second;
+  stream_to_group_map_.erase(it);
+
+  QUICHE_RETURN_IF_ERROR(group_scheduler->Unregister(stream_id));
+  // Clean up the group if there are no more streams associated with it.
+  if (!group_scheduler->HasRegistered()) {
+    per_group_schedulers_.erase(group_id);
+    QUICHE_RETURN_IF_ERROR(active_groups_.Unregister(group_id));
+  }
+  return absl::OkStatus();
+}
+
+absl::Status PriorityScheduler::UpdateSendOrder(StreamId stream_id,
+                                                SendOrder new_send_order) {
+  PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+  if (scheduler == nullptr) {
+    return absl::NotFoundError("Stream ID not registered");
+  }
+  return scheduler->UpdatePriority(stream_id, new_send_order);
+}
+
+absl::Status PriorityScheduler::UpdateSendGroup(StreamId stream_id,
+                                                SendGroupId new_send_group) {
+  PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+  if (scheduler == nullptr) {
+    return absl::NotFoundError("Stream ID not registered");
+  }
+  bool is_scheduled = scheduler->IsScheduled(stream_id);
+  std::optional<SendOrder> send_order = scheduler->GetPriorityFor(stream_id);
+  if (!send_order.has_value()) {
+    return absl::InternalError(
+        "Stream registered at the top level scheduler, but not at the "
+        "per-group one");
+  }
+  QUICHE_RETURN_IF_ERROR(Unregister(stream_id));
+  QUICHE_RETURN_IF_ERROR(
+      Register(stream_id, StreamPriority{new_send_group, *send_order}));
+  if (is_scheduled) {
+    QUICHE_RETURN_IF_ERROR(Schedule(stream_id));
+  }
+  return absl::OkStatus();
+}
+
+std::optional<StreamPriority> PriorityScheduler::GetPriorityFor(
+    StreamId stream_id) const {
+  auto it = stream_to_group_map_.find(stream_id);
+  if (it == stream_to_group_map_.end()) {
+    return std::nullopt;
+  }
+  const auto& [group_id, group_scheduler] = *it->second;
+  std::optional<SendOrder> send_order =
+      group_scheduler.GetPriorityFor(stream_id);
+  if (!send_order.has_value()) {
+    return std::nullopt;
+  }
+  return StreamPriority{group_id, *send_order};
+}
+
+absl::StatusOr<bool> PriorityScheduler::ShouldYield(StreamId stream_id) const {
+  auto it = stream_to_group_map_.find(stream_id);
+  if (it == stream_to_group_map_.end()) {
+    return absl::NotFoundError("Stream ID not registered");
+  }
+  const auto& [group_id, group_scheduler] = *it->second;
+
+  absl::StatusOr<bool> per_group_result = active_groups_.ShouldYield(group_id);
+  QUICHE_RETURN_IF_ERROR(per_group_result.status());
+  if (*per_group_result) {
+    return true;
+  }
+
+  return group_scheduler.ShouldYield(stream_id);
+}
+
+absl::StatusOr<StreamId> PriorityScheduler::PopFront() {
+  absl::StatusOr<SendGroupId> group_id = active_groups_.PopFront();
+  QUICHE_RETURN_IF_ERROR(group_id.status());
+
+  auto it = per_group_schedulers_.find(*group_id);
+  if (it == per_group_schedulers_.end()) {
+    return absl::InternalError(
+        "Scheduled a group with no per-group scheduler attached");
+  }
+  PerGroupScheduler& scheduler = it->second;
+  absl::StatusOr<StreamId> result = scheduler.PopFront();
+  if (!result.ok()) {
+    return absl::InternalError("Inactive group found in top-level schedule");
+  }
+
+  // Reschedule the group if it has more active streams in it.
+  if (scheduler.HasScheduled()) {
+    QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(*group_id));
+  }
+
+  return result;
+}
+
+absl::Status PriorityScheduler::Schedule(StreamId stream_id) {
+  auto it = stream_to_group_map_.find(stream_id);
+  if (it == stream_to_group_map_.end()) {
+    return absl::NotFoundError("Stream ID not registered");
+  }
+  auto& [group_id, group_scheduler] = *it->second;
+  QUICHE_RETURN_IF_ERROR(active_groups_.Schedule(group_id));
+  return group_scheduler.Schedule(stream_id);
+}
+
+bool PriorityScheduler::IsScheduled(StreamId stream_id) const {
+  const PerGroupScheduler* scheduler = SchedulerForStream(stream_id);
+  if (scheduler == nullptr) {
+    return false;
+  }
+  return scheduler->IsScheduled(stream_id);
+}
+
+}  // namespace webtransport
diff --git a/quiche/web_transport/web_transport_priority_scheduler.h b/quiche/web_transport/web_transport_priority_scheduler.h
new file mode 100644
index 0000000..7e71ed0
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler.h
@@ -0,0 +1,109 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
+#define QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
+
+#include <cstddef>
+#include <optional>
+#include <utility>
+
+#include "absl/container/flat_hash_map.h"
+#include "absl/container/node_hash_map.h"
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "quiche/common/btree_scheduler.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+
+// Schedules the streams within a WebTransport session as defined by the W3C
+// WebTransport API. Unlike the W3C API, there is no need to track groups
+// manually: the group is created when a first stream with the associated group
+// ID is registered, and it is deleted when the last stream associated with the
+// group is unregistered.
+class QUICHE_EXPORT PriorityScheduler {
+ public:
+  // Returns true if there are any streams registered.
+  bool HasRegistered() const { return active_groups_.HasRegistered(); }
+  // Returns true if there are any streams scheduled.
+  bool HasScheduled() const { return active_groups_.HasScheduled(); }
+
+  // Returns the number of currently scheduled streams.
+  size_t NumScheduled() const {
+    size_t total = 0;
+    for (const auto& [group_id, group_scheduler] : per_group_schedulers_) {
+      total += group_scheduler.NumScheduled();
+    }
+    return total;
+  }
+
+  // Registers the specified stream with the supplied priority.  The stream must
+  // not be already registered.
+  absl::Status Register(StreamId stream_id, const StreamPriority& priority);
+  // Unregisters a previously registered stream.
+  absl::Status Unregister(StreamId stream_id);
+  // Alters the priority of an already registered stream.
+  absl::Status UpdateSendOrder(StreamId stream_id, SendOrder new_send_order);
+  absl::Status UpdateSendGroup(StreamId stream_id, SendGroupId new_send_group);
+
+  // Returns true if there is a stream that would go before `id` in the
+  // schedule.
+  absl::StatusOr<bool> ShouldYield(StreamId id) const;
+
+  // Returns the priority for `id`, or nullopt if stream is not registered.
+  std::optional<StreamPriority> GetPriorityFor(StreamId stream_id) const;
+
+  // Pops the highest priority stream.  Will fail if the schedule is empty.
+  absl::StatusOr<StreamId> PopFront();
+
+  // Adds `stream` to the schedule if it's not already there.
+  absl::Status Schedule(StreamId stream_id);
+  // Returns true if `stream` is in the schedule.
+  bool IsScheduled(StreamId stream_id) const;
+
+ private:
+  // All groups currently have the equal priority; this type represents the said
+  // single priority.
+  class SinglePriority {
+   public:
+    bool operator==(const SinglePriority&) const { return true; }
+    bool operator!=(const SinglePriority&) const { return false; }
+
+    bool operator<(const SinglePriority&) const { return false; }
+    bool operator>(const SinglePriority&) const { return false; }
+    bool operator<=(const SinglePriority&) const { return true; }
+    bool operator>=(const SinglePriority&) const { return true; }
+  };
+
+  using PerGroupScheduler = quiche::BTreeScheduler<StreamId, SendOrder>;
+  using GroupSchedulerPair = std::pair<const SendGroupId, PerGroupScheduler>;
+
+  // Round-robin schedule for the groups.
+  quiche::BTreeScheduler<SendGroupId, SinglePriority> active_groups_;
+  // Map group ID to the scheduler for the group in question.
+  absl::node_hash_map<SendGroupId, PerGroupScheduler> per_group_schedulers_;
+  // Map stream ID to a pointer to the entry in `per_group_schedulers_`.
+  absl::flat_hash_map<StreamId, GroupSchedulerPair*> stream_to_group_map_;
+
+  PerGroupScheduler* SchedulerForStream(StreamId stream_id) {
+    auto it = stream_to_group_map_.find(stream_id);
+    if (it == stream_to_group_map_.end()) {
+      return nullptr;
+    }
+    return &it->second->second;
+  }
+  const PerGroupScheduler* SchedulerForStream(StreamId stream_id) const {
+    auto it = stream_to_group_map_.find(stream_id);
+    if (it == stream_to_group_map_.end()) {
+      return nullptr;
+    }
+    return &it->second->second;
+  }
+};
+
+}  // namespace webtransport
+
+#endif  // QUICHE_WEB_TRANSPORT_WEB_TRANSPORT_PRIORITY_SCHEDULER_H_
diff --git a/quiche/web_transport/web_transport_priority_scheduler_test.cc b/quiche/web_transport/web_transport_priority_scheduler_test.cc
new file mode 100644
index 0000000..d89a52d
--- /dev/null
+++ b/quiche/web_transport/web_transport_priority_scheduler_test.cc
@@ -0,0 +1,260 @@
+// Copyright 2023 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/web_transport/web_transport_priority_scheduler.h"
+
+#include <optional>
+#include <vector>
+
+#include "absl/status/status.h"
+#include "absl/status/statusor.h"
+#include "absl/types/span.h"
+#include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/test_tools/quiche_test_utils.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace webtransport {
+namespace {
+
+using ::quiche::test::IsOkAndHolds;
+using ::quiche::test::StatusIs;
+using ::testing::ElementsAre;
+
+void ScheduleIds(PriorityScheduler& scheduler, absl::Span<const StreamId> ids) {
+  for (StreamId id : ids) {
+    QUICHE_EXPECT_OK(scheduler.Schedule(id));
+  }
+}
+
+std::vector<StreamId> PopAll(PriorityScheduler& scheduler) {
+  std::vector<StreamId> result;
+  result.reserve(scheduler.NumScheduled());
+  for (;;) {
+    absl::StatusOr<StreamId> id = scheduler.PopFront();
+    if (!id.ok()) {
+      EXPECT_THAT(id, StatusIs(absl::StatusCode::kNotFound));
+      break;
+    }
+    result.push_back(*id);
+  }
+  return result;
+}
+
+TEST(WebTransportSchedulerTest, Register) {
+  PriorityScheduler scheduler;
+
+  // Register two streams in the same group.
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(4, StreamPriority{0, 0}));
+
+  // Attempt re-registering.
+  EXPECT_THAT(scheduler.Register(4, StreamPriority{0, 0}),
+              StatusIs(absl::StatusCode::kAlreadyExists));
+  EXPECT_THAT(scheduler.Register(4, StreamPriority{1, 0}),
+              StatusIs(absl::StatusCode::kAlreadyExists));
+}
+
+TEST(WebTransportSchedulerTest, Unregister) {
+  PriorityScheduler scheduler;
+
+  EXPECT_FALSE(scheduler.HasRegistered());
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+  EXPECT_TRUE(scheduler.HasRegistered());
+  QUICHE_EXPECT_OK(scheduler.Unregister(1));
+  EXPECT_TRUE(scheduler.HasRegistered());
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+  ScheduleIds(scheduler, {0, 1});
+  QUICHE_EXPECT_OK(scheduler.Unregister(0));
+  QUICHE_EXPECT_OK(scheduler.Unregister(1));
+  EXPECT_FALSE(scheduler.HasRegistered());
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  EXPECT_TRUE(scheduler.HasRegistered());
+  EXPECT_FALSE(scheduler.HasScheduled());
+}
+
+TEST(WebTransportSchedulerTest, UpdatePriority) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 10}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 20}));
+  EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{0, 10}));
+  EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 20}));
+
+  QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(0, 1));
+  QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 40));
+  EXPECT_EQ(scheduler.GetPriorityFor(0), (StreamPriority{1, 10}));
+  EXPECT_EQ(scheduler.GetPriorityFor(1), (StreamPriority{0, 40}));
+
+  EXPECT_THAT(scheduler.UpdateSendGroup(1000, 1),
+              StatusIs(absl::StatusCode::kNotFound));
+  EXPECT_THAT(scheduler.UpdateSendOrder(1000, 1),
+              StatusIs(absl::StatusCode::kNotFound));
+  EXPECT_EQ(scheduler.GetPriorityFor(1000), std::nullopt);
+}
+
+TEST(WebTransportSchedulerTest, Schedule) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+
+  EXPECT_FALSE(scheduler.IsScheduled(0));
+  EXPECT_FALSE(scheduler.IsScheduled(1));
+  EXPECT_FALSE(scheduler.IsScheduled(1000));
+
+  QUICHE_EXPECT_OK(scheduler.Schedule(0));
+  EXPECT_TRUE(scheduler.IsScheduled(0));
+  EXPECT_FALSE(scheduler.IsScheduled(1));
+
+  QUICHE_EXPECT_OK(scheduler.Schedule(1));
+  EXPECT_TRUE(scheduler.IsScheduled(0));
+  EXPECT_TRUE(scheduler.IsScheduled(1));
+
+  EXPECT_THAT(scheduler.Schedule(0), StatusIs(absl::StatusCode::kOk));
+  EXPECT_THAT(scheduler.Schedule(2), StatusIs(absl::StatusCode::kNotFound));
+}
+
+TEST(WebTransportSchedulerTest, SamePriority) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 0}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_EQ(scheduler.NumScheduled(), 4);
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+  ScheduleIds(scheduler, {3, 1, 2});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2));
+}
+
+TEST(WebTransportSchedulerTest, SingleBucketOrdered) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 1}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 2}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{0, 3}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0));
+  ScheduleIds(scheduler, {3, 1, 2, 0});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 2, 1, 0));
+}
+
+TEST(WebTransportSchedulerTest, EveryStreamInItsOwnBucket) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{1, 1}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{2, 2}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{3, 3}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+  ScheduleIds(scheduler, {3, 1, 2});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2));
+}
+
+TEST(WebTransportSchedulerTest, TwoBucketsNoSendOrder) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+  ScheduleIds(scheduler, {0, 2, 1, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+  ScheduleIds(scheduler, {3, 2, 1, 0});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0));
+
+  ScheduleIds(scheduler, {0, 2});
+  EXPECT_THAT(scheduler.PopFront(), IsOkAndHolds(0));
+  ScheduleIds(scheduler, {1, 3, 0});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(2, 1, 3, 0));
+}
+
+TEST(WebTransportSchedulerTest, TwoBucketsWithSendOrder) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 10}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 20}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 30}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 3, 0, 2));
+  ScheduleIds(scheduler, {3, 2, 1, 0});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(3, 1, 2, 0));
+}
+
+TEST(WebTransportSchedulerTest, ShouldYield) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{0, 10}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+  EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(4), StatusIs(absl::StatusCode::kNotFound));
+
+  QUICHE_EXPECT_OK(scheduler.Schedule(0));
+  EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true));
+  PopAll(scheduler);
+
+  QUICHE_EXPECT_OK(scheduler.Schedule(2));
+  EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(false));
+  EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(true));
+  PopAll(scheduler);
+
+  QUICHE_EXPECT_OK(scheduler.Schedule(3));
+  EXPECT_THAT(scheduler.ShouldYield(0), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(1), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(2), IsOkAndHolds(true));
+  EXPECT_THAT(scheduler.ShouldYield(3), IsOkAndHolds(false));
+  PopAll(scheduler);
+}
+
+TEST(WebTransportSchedulerTest, UpdatePriorityWhileScheduled) {
+  PriorityScheduler scheduler;
+
+  QUICHE_EXPECT_OK(scheduler.Register(0, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(1, StreamPriority{0, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(2, StreamPriority{1, 0}));
+  QUICHE_EXPECT_OK(scheduler.Register(3, StreamPriority{1, 0}));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 2, 1, 3));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  QUICHE_EXPECT_OK(scheduler.UpdateSendOrder(1, 10));
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(1, 2, 0, 3));
+
+  ScheduleIds(scheduler, {0, 1, 2, 3});
+  QUICHE_EXPECT_OK(scheduler.UpdateSendGroup(1, 1));
+  EXPECT_THAT(PopAll(scheduler), ElementsAre(0, 1, 2, 3));
+}
+
+}  // namespace
+}  // namespace webtransport