Order MoQT Queued streams by send_order.

PiperOrigin-RevId: 673155138
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc
index 5528079..73e0171 100644
--- a/quiche/quic/moqt/moqt_priority.cc
+++ b/quiche/quic/moqt/moqt_priority.cc
@@ -64,6 +64,15 @@
   return track_bits | (group_id << 20) | object_id;
 }
 
+webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
+    const webtransport::SendOrder send_order,
+    MoqtPriority subscriber_priority) {
+  webtransport::SendOrder new_send_order = OnlyLowestNBits<54>(send_order);
+  const int64_t sub_bits = Flip<8>(subscriber_priority) << 54;
+  new_send_order |= sub_bits;
+  return new_send_order;
+}
+
 const webtransport::SendOrder kMoqtControlStreamSendOrder =
     std::numeric_limits<webtransport::SendOrder>::max();
 
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index 85449d9..0dfd490 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -32,6 +32,10 @@
     MoqtPriority subscriber_priority, MoqtPriority publisher_priority,
     uint64_t group_id, uint64_t object_id, MoqtDeliveryOrder delivery_order);
 
+// Returns |send_order| updated with the new |subscriber_priority|.
+QUICHE_EXPORT webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
+    webtransport::SendOrder send_order, MoqtPriority subscriber_priority);
+
 // WebTransport send order set on the MoQT control stream.
 QUICHE_EXPORT extern const webtransport::SendOrder kMoqtControlStreamSendOrder;
 
diff --git a/quiche/quic/moqt/moqt_priority_test.cc b/quiche/quic/moqt/moqt_priority_test.cc
index d30c07c..f677eef 100644
--- a/quiche/quic/moqt/moqt_priority_test.cc
+++ b/quiche/quic/moqt/moqt_priority_test.cc
@@ -55,5 +55,13 @@
       SendOrderForStream(0x80, 0x80, 0, 0, MoqtDeliveryOrder::kDescending));
 }
 
+TEST(MoqtPriorityTest, UpdateSendOrderForSubscriberPriority) {
+  EXPECT_EQ(
+      UpdateSendOrderForSubscriberPriority(
+          SendOrderForStream(0x80, 0x80, 0, MoqtDeliveryOrder::kAscending),
+          0x10),
+      SendOrderForStream(0x10, 0x80, 0, MoqtDeliveryOrder::kAscending));
+}
+
 }  // namespace
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2d9d073..c74bc39 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -15,6 +15,7 @@
 
 
 #include "absl/algorithm/container.h"
+#include "absl/container/btree_map.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
 #include "absl/container/node_hash_map.h"
@@ -420,17 +421,6 @@
 
 webtransport::Stream* MoqtSession::OpenOrQueueDataStream(
     uint64_t subscription_id, FullSequence first_object) {
-  if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
-    queued_outgoing_data_streams_.push_back(
-        QueuedOutgoingDataStream{subscription_id, first_object});
-    // TODO: limit the number of streams in the queue.
-    return nullptr;
-  }
-  return OpenDataStream(subscription_id, first_object);
-}
-
-webtransport::Stream* MoqtSession::OpenDataStream(uint64_t subscription_id,
-                                                  FullSequence first_object) {
   auto it = published_subscriptions_.find(subscription_id);
   if (it == published_subscriptions_.end()) {
     // It is possible that the subscription has been discarded while the stream
@@ -438,7 +428,18 @@
     return nullptr;
   }
   PublishedSubscription& subscription = *it->second;
+  if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
+    subscription.AddQueuedOutgoingDataStream(first_object);
+    // The subscription will notify the session about how to update the
+    // session's queue.
+    // TODO: limit the number of streams in the queue.
+    return nullptr;
+  }
+  return OpenDataStream(subscription, first_object);
+}
 
+webtransport::Stream* MoqtSession::OpenDataStream(
+    PublishedSubscription& subscription, FullSequence first_object) {
   webtransport::Stream* new_stream =
       session_->OpenOutgoingUnidirectionalStream();
   if (new_stream == nullptr) {
@@ -447,24 +448,50 @@
     return nullptr;
   }
   new_stream->SetVisitor(std::make_unique<OutgoingDataStream>(
-      this, new_stream, subscription_id, subscription, first_object));
+      this, new_stream, subscription, first_object));
   subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object);
   return new_stream;
 }
 
 void MoqtSession::OnCanCreateNewOutgoingUnidirectionalStream() {
-  while (!queued_outgoing_data_streams_.empty() &&
+  while (!subscribes_with_queued_outgoing_data_streams_.empty() &&
          session_->CanOpenNextOutgoingUnidirectionalStream()) {
-    QueuedOutgoingDataStream next = queued_outgoing_data_streams_.front();
-    queued_outgoing_data_streams_.pop_front();
+    auto next = subscribes_with_queued_outgoing_data_streams_.rbegin();
+    auto subscription = published_subscriptions_.find(next->subscription_id);
+    if (subscription == published_subscriptions_.end()) {
+      // Subscription no longer exists; delete the entry.
+      subscribes_with_queued_outgoing_data_streams_.erase((++next).base());
+      continue;
+    }
+    // Open the stream. The second argument pops the item from the
+    // subscription's queue, which might update
+    // subscribes_with_queued_outgoing_data_streams_.
     webtransport::Stream* stream =
-        OpenDataStream(next.subscription_id, next.first_object);
+        OpenDataStream(*subscription->second,
+                       subscription->second->NextQueuedOutgoingDataStream());
     if (stream != nullptr) {
       stream->visitor()->OnCanWrite();
     }
   }
 }
 
+void MoqtSession::UpdateQueuedSendOrder(
+    uint64_t subscribe_id,
+    std::optional<webtransport::SendOrder> old_send_order,
+    std::optional<webtransport::SendOrder> new_send_order) {
+  if (old_send_order == new_send_order) {
+    return;
+  }
+  if (old_send_order.has_value()) {
+    subscribes_with_queued_outgoing_data_streams_.erase(
+        SubscriptionWithQueuedStream{*old_send_order, subscribe_id});
+  }
+  if (new_send_order.has_value()) {
+    subscribes_with_queued_outgoing_data_streams_.emplace(*new_send_order,
+                                                          subscribe_id);
+  }
+}
+
 std::pair<FullTrackName, RemoteTrack::Visitor*>
 MoqtSession::TrackPropertiesFromAlias(const MoqtObject& message) {
   auto it = remote_tracks_.find(message.track_alias);
@@ -935,6 +962,22 @@
   // TODO: send an error for invalid updates now that it's a part of draft-05.
 }
 
+void MoqtSession::PublishedSubscription::set_subscriber_priority(
+    MoqtPriority priority) {
+  if (priority == subscriber_priority_) {
+    return;
+  }
+  if (queued_outgoing_data_streams_.empty()) {
+    subscriber_priority_ = priority;
+    return;
+  }
+  webtransport::SendOrder old_send_order =
+      FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
+  subscriber_priority_ = priority;
+  session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+                                  FinalizeSendOrder(old_send_order));
+};
+
 void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
     FullSequence sequence) {
   if (!window_.InWindow(sequence)) {
@@ -1003,6 +1046,80 @@
   return lazily_initialized_stream_map_->GetAllStreams();
 }
 
+webtransport::SendOrder MoqtSession::PublishedSubscription::GetSendOrder(
+    FullSequence sequence) const {
+  MoqtForwardingPreference forwarding_preference =
+      track_publisher_->GetForwardingPreference();
+
+  MoqtPriority publisher_priority = track_publisher_->GetPublisherPriority();
+  MoqtDeliveryOrder delivery_order = subscriber_delivery_order().value_or(
+      track_publisher_->GetDeliveryOrder());
+  switch (forwarding_preference) {
+    case MoqtForwardingPreference::kTrack:
+      return SendOrderForStream(subscriber_priority_, publisher_priority,
+                                /*group_id=*/0, delivery_order);
+      break;
+    case MoqtForwardingPreference::kGroup:
+      return SendOrderForStream(subscriber_priority_, publisher_priority,
+                                sequence.group, delivery_order);
+      break;
+    case MoqtForwardingPreference::kObject:
+      return SendOrderForStream(subscriber_priority_, publisher_priority,
+                                sequence.group, sequence.object,
+                                delivery_order);
+      break;
+    case MoqtForwardingPreference::kDatagram:
+      QUICHE_NOTREACHED();
+      return 0;
+  }
+}
+
+// Returns the highest send order in the subscription.
+void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream(
+    FullSequence first_object) {
+  std::optional<webtransport::SendOrder> start_send_order =
+      queued_outgoing_data_streams_.empty()
+          ? std::optional<webtransport::SendOrder>()
+          : queued_outgoing_data_streams_.rbegin()->first;
+  webtransport::SendOrder send_order = GetSendOrder(first_object);
+  // Zero out the subscriber priority bits, since these will be added when
+  // updating the session.
+  queued_outgoing_data_streams_.emplace(
+      UpdateSendOrderForSubscriberPriority(send_order, 0), first_object);
+  if (!start_send_order.has_value()) {
+    session_->UpdateQueuedSendOrder(subscription_id_, std::nullopt, send_order);
+  } else if (*start_send_order < send_order) {
+    session_->UpdateQueuedSendOrder(
+        subscription_id_, FinalizeSendOrder(*start_send_order), send_order);
+  }
+}
+
+FullSequence
+MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
+  QUICHE_DCHECK(!queued_outgoing_data_streams_.empty());
+  if (queued_outgoing_data_streams_.empty()) {
+    return FullSequence();
+  }
+  auto it = queued_outgoing_data_streams_.rbegin();
+  webtransport::SendOrder old_send_order = FinalizeSendOrder(it->first);
+  FullSequence first_object = it->second;
+  // converting a reverse iterator to an iterator involves incrementing it and
+  // then taking base().
+  queued_outgoing_data_streams_.erase((++it).base());
+  if (queued_outgoing_data_streams_.empty()) {
+    session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+                                    std::nullopt);
+  } else {
+    webtransport::SendOrder new_send_order =
+        FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
+    if (old_send_order != new_send_order) {
+      session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+                                      new_send_order);
+    }
+  }
+  return first_object;
+}
+
 void MoqtSession::PublishedSubscription::OnDataStreamCreated(
     webtransport::StreamId id, FullSequence start_sequence) {
   stream_map().AddStream(start_sequence, id);
@@ -1023,11 +1140,10 @@
 
 MoqtSession::OutgoingDataStream::OutgoingDataStream(
     MoqtSession* session, webtransport::Stream* stream,
-    uint64_t subscription_id, PublishedSubscription& subscription,
-    FullSequence first_object)
+    PublishedSubscription& subscription, FullSequence first_object)
     : session_(session),
       stream_(stream),
-      subscription_id_(subscription_id),
+      subscription_id_(subscription.subscription_id()),
       next_object_(first_object),
       session_liveness_(session->liveness_token_) {
   UpdateSendOrder(subscription);
@@ -1209,41 +1325,9 @@
 
 void MoqtSession::OutgoingDataStream::UpdateSendOrder(
     PublishedSubscription& subscription) {
-  MoqtTrackPublisher& publisher = subscription.publisher();
-  MoqtForwardingPreference forwarding_preference =
-      publisher.GetForwardingPreference();
-
-  // Use `next_object_` here since the priority-relevant sequence numbers never
-  // change for a given stream.
-  FullSequence sequence = next_object_;
-
-  MoqtPriority subscriber_priority = subscription.subscriber_priority();
-  MoqtPriority publisher_priority = publisher.GetPublisherPriority();
-  MoqtDeliveryOrder delivery_order =
-      subscription.subscriber_delivery_order().value_or(
-          publisher.GetDeliveryOrder());
-  webtransport::SendOrder send_order;
-  switch (forwarding_preference) {
-    case MoqtForwardingPreference::kTrack:
-      send_order = SendOrderForStream(subscriber_priority, publisher_priority,
-                                      /*group_id=*/0, delivery_order);
-      break;
-    case MoqtForwardingPreference::kGroup:
-      send_order = SendOrderForStream(subscriber_priority, publisher_priority,
-                                      sequence.group, delivery_order);
-      break;
-    case MoqtForwardingPreference::kObject:
-      send_order =
-          SendOrderForStream(subscriber_priority, publisher_priority,
-                             sequence.group, sequence.object, delivery_order);
-      break;
-    case MoqtForwardingPreference::kDatagram:
-      QUICHE_NOTREACHED();
-      return;
-  }
   stream_->SetPriority(
       webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId,
-                                   /*send_order=*/send_order});
+                                   subscription.GetSendOrder(next_object_)});
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 8ccbdd6..e5a90f5 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -12,6 +12,8 @@
 #include <utility>
 #include <vector>
 
+#include "absl/container/btree_map.h"
+#include "absl/container/btree_set.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
 #include "absl/strings/string_view.h"
@@ -27,7 +29,6 @@
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/quiche_callbacks.h"
-#include "quiche/common/quiche_circular_deque.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -66,6 +67,13 @@
       DefaultIncomingAnnounceCallback;
 };
 
+struct SubscriptionWithQueuedStream {
+  webtransport::SendOrder send_order;
+  uint64_t subscription_id;
+
+  auto operator<=>(const SubscriptionWithQueuedStream& other) const = default;
+};
+
 // MoqtPublishingMonitorInterface allows a publisher monitor the delivery
 // progress for a single individual subscriber.
 class MoqtPublishingMonitorInterface {
@@ -154,6 +162,15 @@
 
   void Close() { session_->CloseSession(0, "Application closed"); }
 
+  // Tells the session that the highest send order for pending streams in a
+  // subscription has changed. If |old_send_order| is nullopt, this is the
+  // first pending stream. If |new_send_order| is nullopt, the subscription
+  // has no pending streams anymore.
+  void UpdateQueuedSendOrder(
+      uint64_t subscribe_id,
+      std::optional<webtransport::SendOrder> old_send_order,
+      std::optional<webtransport::SendOrder> new_send_order);
+
  private:
   friend class test::MoqtSessionPeer;
 
@@ -329,6 +346,7 @@
     PublishedSubscription& operator=(const PublishedSubscription&) = delete;
     PublishedSubscription& operator=(PublishedSubscription&&) = delete;
 
+    uint64_t subscription_id() const { return subscription_id_; }
     MoqtTrackPublisher& publisher() { return *track_publisher_; }
     uint64_t track_alias() const { return track_alias_; }
     std::optional<FullSequence> largest_sent() const { return largest_sent_; }
@@ -336,6 +354,7 @@
     std::optional<MoqtDeliveryOrder> subscriber_delivery_order() const {
       return subscriber_delivery_order_;
     }
+    void set_subscriber_priority(MoqtPriority priority);
 
     void OnNewObjectAvailable(FullSequence sequence) override;
     void ProcessObjectAck(const MoqtObjectAck& message) {
@@ -367,6 +386,15 @@
 
     std::vector<webtransport::StreamId> GetAllStreams() const;
 
+    webtransport::SendOrder GetSendOrder(FullSequence sequence) const;
+
+    void AddQueuedOutgoingDataStream(FullSequence first_object);
+    // Pops the pending outgoing data stream, with the highest send order.
+    // The session keeps track of which subscribes have pending streams. This
+    // function will trigger a QUICHE_DCHECK if called when there are no pending
+    // streams.
+    FullSequence NextQueuedOutgoingDataStream();
+
    private:
     SendStreamMap& stream_map();
     quic::Perspective perspective() const {
@@ -374,6 +402,11 @@
     }
 
     void SendDatagram(FullSequence sequence);
+    webtransport::SendOrder FinalizeSendOrder(
+        webtransport::SendOrder send_order) {
+      return UpdateSendOrderForSubscriberPriority(send_order,
+                                                  subscriber_priority_);
+    }
 
     uint64_t subscription_id_;
     MoqtSession* session_;
@@ -387,11 +420,15 @@
     std::optional<FullSequence> largest_sent_;
     // Should be almost always accessed via `stream_map()`.
     std::optional<SendStreamMap> lazily_initialized_stream_map_;
+    // Store the send order of queued outgoing data streams. Use a
+    // subscriber_priority_ of zero to avoid having to update it, and call
+    // FinalizeSendOrder() whenever delivering it to the MoqtSession.d
+    absl::btree_multimap<webtransport::SendOrder, FullSequence>
+        queued_outgoing_data_streams_;
   };
   class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
    public:
     OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
-                       uint64_t subscription_id,
                        PublishedSubscription& subscription,
                        FullSequence first_object);
     ~OutgoingDataStream();
@@ -436,14 +473,6 @@
 
   // Private members of MoqtSession.
 
-  // QueuedOutgoingDataStream records an information necessary to create a
-  // stream that was attempted to be created before but was blocked due to flow
-  // control.
-  struct QueuedOutgoingDataStream {
-    uint64_t subscription_id;
-    FullSequence first_object;
-  };
-
   // Returns true if SUBSCRIBE_DONE was sent.
   bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code,
                        absl::string_view reason_phrase);
@@ -462,7 +491,7 @@
                                               FullSequence first_object);
   // Same as above, except the session is required to be not flow control
   // blocked.
-  webtransport::Stream* OpenDataStream(uint64_t subscription_id,
+  webtransport::Stream* OpenDataStream(PublishedSubscription& subscription,
                                        FullSequence first_object);
 
   // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns
@@ -513,10 +542,9 @@
   // Subscriptions for local tracks by the remote peer, indexed by subscribe ID.
   absl::flat_hash_map<uint64_t, std::unique_ptr<PublishedSubscription>>
       published_subscriptions_;
-  // Keeps track of all the data streams that were supposed to be open, but were
-  // blocked by the flow control.
-  quiche::QuicheCircularDeque<QueuedOutgoingDataStream>
-      queued_outgoing_data_streams_;
+  // Keeps track of all subscribe IDs that have queued outgoing data streams.
+  absl::btree_set<SubscriptionWithQueuedStream>
+      subscribes_with_queued_outgoing_data_streams_;
   // This is only used to check for track_alias collisions.
   absl::flat_hash_set<uint64_t> used_track_aliases_;
   uint64_t next_local_track_alias_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 9547cf4..a129a9d 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -138,6 +138,17 @@
     return session->published_subscriptions_[subscribe_id].get();
   }
 
+  static void DeleteSubscription(MoqtSession* session, uint64_t subscribe_id) {
+    session->published_subscriptions_.erase(subscribe_id);
+  }
+
+  static void UpdateSubscriberPriority(MoqtSession* session,
+                                       uint64_t subscribe_id,
+                                       MoqtPriority priority) {
+    session->published_subscriptions_[subscribe_id]->set_subscriber_priority(
+        priority);
+  }
+
   static void set_peer_role(MoqtSession* session, MoqtRole role) {
     session->peer_role_ = role;
   }
@@ -1246,6 +1257,221 @@
   stream_input->OnAnnounceMessage(announce);
 }
 
+TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
+  FullTrackName ftn("foo", "bar");
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
+  subscription->OnNewObjectAvailable(FullSequence(1, 0));
+  subscription->OnNewObjectAvailable(FullSequence(0, 0));
+  subscription->OnNewObjectAvailable(FullSequence(2, 0));
+  // These should be opened in the sequence (0, 0), (1, 0), (2, 0).
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillRepeatedly(Return(true));
+  webtransport::test::MockStream mock_stream0, mock_stream1, mock_stream2;
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream0))
+      .WillOnce(Return(&mock_stream1))
+      .WillOnce(Return(&mock_stream2));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor[3];
+  EXPECT_CALL(mock_stream0, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor[0] = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream1, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor[1] = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream2, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor[2] = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
+  EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
+  EXPECT_CALL(mock_stream2, GetStreamId()).WillRepeatedly(Return(2));
+  EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
+    return stream_visitor[0].get();
+  });
+  EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
+    return stream_visitor[1].get();
+  });
+  EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
+    return stream_visitor[2].get();
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0)))
+      .WillOnce(
+          Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
+                                 MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1)))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 0)))
+      .WillOnce(
+          Return(PublishedObject{FullSequence(1, 0), MoqtObjectStatus::kNormal,
+                                 MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 1)))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 0)))
+      .WillOnce(
+          Return(PublishedObject{FullSequence(2, 0), MoqtObjectStatus::kNormal,
+                                 MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 1)))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream0, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        // The Group ID is the 5th byte of the stream.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+        return absl::OkStatus();
+      });
+  EXPECT_CALL(mock_stream1, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        // The Group ID is the 5th byte of the stream.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 1);
+        return absl::OkStatus();
+      });
+  EXPECT_CALL(mock_stream2, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        // The Group ID is the 5th byte of the stream.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 2);
+        return absl::OkStatus();
+      });
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
+}
+
+TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
+  FullTrackName ftn("foo", "bar");
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+  MoqtObjectListener* subscription =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
+  subscription->OnNewObjectAvailable(FullSequence(0, 0));
+
+  // Delete the subscription, then grant stream credit.
+  MoqtSessionPeer::DeleteSubscription(&session_, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()).Times(0);
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
+}
+
+TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
+  FullTrackName ftn("foo", "bar");
+  auto track =
+      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+  // Create two identical subscriptions with different priorities.
+  MoqtObjectListener* subscription0 =
+      MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0);
+  MoqtObjectListener* subscription1 =
+      MoqtSessionPeer::AddSubscription(&session_, track, 1, 14, 0, 0);
+  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 0, 1);
+  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 2);
+
+  // Two arriving objects will queue four streams.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false))
+      .WillOnce(Return(false));
+  EXPECT_CALL(*track, GetTrackStatus())
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
+  subscription0->OnNewObjectAvailable(FullSequence(0, 0));
+  subscription1->OnNewObjectAvailable(FullSequence(0, 0));
+  subscription0->OnNewObjectAvailable(FullSequence(1, 0));
+  subscription1->OnNewObjectAvailable(FullSequence(1, 0));
+
+  // Allow one stream to be opened. It will be group 0, subscription 0.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  webtransport::test::MockStream mock_stream0;
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream0));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor0;
+  EXPECT_CALL(mock_stream0, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor0 = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0));
+  EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
+    return stream_visitor0.get();
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0)))
+      .WillOnce(
+          Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
+                                 MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1)))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream0, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        // Check subscribe ID is 0.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0);
+        // Check Group ID is 0
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+        return absl::OkStatus();
+      });
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
+
+  // Raise the priority of subscription 1 and allow another stream. It will be
+  // group 0, subscription 1.
+  MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true))
+      .WillRepeatedly(Return(false));
+  webtransport::test::MockStream mock_stream1;
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
+      .WillOnce(Return(&mock_stream1));
+  std::unique_ptr<webtransport::StreamVisitor> stream_visitor1;
+  EXPECT_CALL(mock_stream1, SetVisitor(_))
+      .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) {
+        stream_visitor1 = std::move(visitor);
+      });
+  EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1));
+  EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
+    return stream_visitor1.get();
+  });
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0)))
+      .WillOnce(
+          Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
+                                 MemSliceFromString("deadbeef")}));
+  EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1)))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
+  EXPECT_CALL(mock_stream1, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        // Check subscribe ID is 0.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1);
+        // Check Group ID is 0
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+        return absl::OkStatus();
+      });
+  session_.OnCanCreateNewOutgoingUnidirectionalStream();
+}
+
 // TODO: re-enable this test once this behavior is re-implemented.
 #if 0
 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {