Order MoQT Queued streams by send_order. PiperOrigin-RevId: 673155138
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc index 5528079..73e0171 100644 --- a/quiche/quic/moqt/moqt_priority.cc +++ b/quiche/quic/moqt/moqt_priority.cc
@@ -64,6 +64,15 @@ return track_bits | (group_id << 20) | object_id; } +webtransport::SendOrder UpdateSendOrderForSubscriberPriority( + const webtransport::SendOrder send_order, + MoqtPriority subscriber_priority) { + webtransport::SendOrder new_send_order = OnlyLowestNBits<54>(send_order); + const int64_t sub_bits = Flip<8>(subscriber_priority) << 54; + new_send_order |= sub_bits; + return new_send_order; +} + const webtransport::SendOrder kMoqtControlStreamSendOrder = std::numeric_limits<webtransport::SendOrder>::max();
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h index 85449d9..0dfd490 100644 --- a/quiche/quic/moqt/moqt_priority.h +++ b/quiche/quic/moqt/moqt_priority.h
@@ -32,6 +32,10 @@ MoqtPriority subscriber_priority, MoqtPriority publisher_priority, uint64_t group_id, uint64_t object_id, MoqtDeliveryOrder delivery_order); +// Returns |send_order| updated with the new |subscriber_priority|. +QUICHE_EXPORT webtransport::SendOrder UpdateSendOrderForSubscriberPriority( + webtransport::SendOrder send_order, MoqtPriority subscriber_priority); + // WebTransport send order set on the MoQT control stream. QUICHE_EXPORT extern const webtransport::SendOrder kMoqtControlStreamSendOrder;
diff --git a/quiche/quic/moqt/moqt_priority_test.cc b/quiche/quic/moqt/moqt_priority_test.cc index d30c07c..f677eef 100644 --- a/quiche/quic/moqt/moqt_priority_test.cc +++ b/quiche/quic/moqt/moqt_priority_test.cc
@@ -55,5 +55,13 @@ SendOrderForStream(0x80, 0x80, 0, 0, MoqtDeliveryOrder::kDescending)); } +TEST(MoqtPriorityTest, UpdateSendOrderForSubscriberPriority) { + EXPECT_EQ( + UpdateSendOrderForSubscriberPriority( + SendOrderForStream(0x80, 0x80, 0, MoqtDeliveryOrder::kAscending), + 0x10), + SendOrderForStream(0x10, 0x80, 0, MoqtDeliveryOrder::kAscending)); +} + } // namespace } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 2d9d073..c74bc39 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -15,6 +15,7 @@ #include "absl/algorithm/container.h" +#include "absl/container/btree_map.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/container/node_hash_map.h" @@ -420,17 +421,6 @@ webtransport::Stream* MoqtSession::OpenOrQueueDataStream( uint64_t subscription_id, FullSequence first_object) { - if (!session_->CanOpenNextOutgoingUnidirectionalStream()) { - queued_outgoing_data_streams_.push_back( - QueuedOutgoingDataStream{subscription_id, first_object}); - // TODO: limit the number of streams in the queue. - return nullptr; - } - return OpenDataStream(subscription_id, first_object); -} - -webtransport::Stream* MoqtSession::OpenDataStream(uint64_t subscription_id, - FullSequence first_object) { auto it = published_subscriptions_.find(subscription_id); if (it == published_subscriptions_.end()) { // It is possible that the subscription has been discarded while the stream @@ -438,7 +428,18 @@ return nullptr; } PublishedSubscription& subscription = *it->second; + if (!session_->CanOpenNextOutgoingUnidirectionalStream()) { + subscription.AddQueuedOutgoingDataStream(first_object); + // The subscription will notify the session about how to update the + // session's queue. + // TODO: limit the number of streams in the queue. + return nullptr; + } + return OpenDataStream(subscription, first_object); +} +webtransport::Stream* MoqtSession::OpenDataStream( + PublishedSubscription& subscription, FullSequence first_object) { webtransport::Stream* new_stream = session_->OpenOutgoingUnidirectionalStream(); if (new_stream == nullptr) { @@ -447,24 +448,50 @@ return nullptr; } new_stream->SetVisitor(std::make_unique<OutgoingDataStream>( - this, new_stream, subscription_id, subscription, first_object)); + this, new_stream, subscription, first_object)); subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object); return new_stream; } void MoqtSession::OnCanCreateNewOutgoingUnidirectionalStream() { - while (!queued_outgoing_data_streams_.empty() && + while (!subscribes_with_queued_outgoing_data_streams_.empty() && session_->CanOpenNextOutgoingUnidirectionalStream()) { - QueuedOutgoingDataStream next = queued_outgoing_data_streams_.front(); - queued_outgoing_data_streams_.pop_front(); + auto next = subscribes_with_queued_outgoing_data_streams_.rbegin(); + auto subscription = published_subscriptions_.find(next->subscription_id); + if (subscription == published_subscriptions_.end()) { + // Subscription no longer exists; delete the entry. + subscribes_with_queued_outgoing_data_streams_.erase((++next).base()); + continue; + } + // Open the stream. The second argument pops the item from the + // subscription's queue, which might update + // subscribes_with_queued_outgoing_data_streams_. webtransport::Stream* stream = - OpenDataStream(next.subscription_id, next.first_object); + OpenDataStream(*subscription->second, + subscription->second->NextQueuedOutgoingDataStream()); if (stream != nullptr) { stream->visitor()->OnCanWrite(); } } } +void MoqtSession::UpdateQueuedSendOrder( + uint64_t subscribe_id, + std::optional<webtransport::SendOrder> old_send_order, + std::optional<webtransport::SendOrder> new_send_order) { + if (old_send_order == new_send_order) { + return; + } + if (old_send_order.has_value()) { + subscribes_with_queued_outgoing_data_streams_.erase( + SubscriptionWithQueuedStream{*old_send_order, subscribe_id}); + } + if (new_send_order.has_value()) { + subscribes_with_queued_outgoing_data_streams_.emplace(*new_send_order, + subscribe_id); + } +} + std::pair<FullTrackName, RemoteTrack::Visitor*> MoqtSession::TrackPropertiesFromAlias(const MoqtObject& message) { auto it = remote_tracks_.find(message.track_alias); @@ -935,6 +962,22 @@ // TODO: send an error for invalid updates now that it's a part of draft-05. } +void MoqtSession::PublishedSubscription::set_subscriber_priority( + MoqtPriority priority) { + if (priority == subscriber_priority_) { + return; + } + if (queued_outgoing_data_streams_.empty()) { + subscriber_priority_ = priority; + return; + } + webtransport::SendOrder old_send_order = + FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first); + subscriber_priority_ = priority; + session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, + FinalizeSendOrder(old_send_order)); +}; + void MoqtSession::PublishedSubscription::OnNewObjectAvailable( FullSequence sequence) { if (!window_.InWindow(sequence)) { @@ -1003,6 +1046,80 @@ return lazily_initialized_stream_map_->GetAllStreams(); } +webtransport::SendOrder MoqtSession::PublishedSubscription::GetSendOrder( + FullSequence sequence) const { + MoqtForwardingPreference forwarding_preference = + track_publisher_->GetForwardingPreference(); + + MoqtPriority publisher_priority = track_publisher_->GetPublisherPriority(); + MoqtDeliveryOrder delivery_order = subscriber_delivery_order().value_or( + track_publisher_->GetDeliveryOrder()); + switch (forwarding_preference) { + case MoqtForwardingPreference::kTrack: + return SendOrderForStream(subscriber_priority_, publisher_priority, + /*group_id=*/0, delivery_order); + break; + case MoqtForwardingPreference::kGroup: + return SendOrderForStream(subscriber_priority_, publisher_priority, + sequence.group, delivery_order); + break; + case MoqtForwardingPreference::kObject: + return SendOrderForStream(subscriber_priority_, publisher_priority, + sequence.group, sequence.object, + delivery_order); + break; + case MoqtForwardingPreference::kDatagram: + QUICHE_NOTREACHED(); + return 0; + } +} + +// Returns the highest send order in the subscription. +void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream( + FullSequence first_object) { + std::optional<webtransport::SendOrder> start_send_order = + queued_outgoing_data_streams_.empty() + ? std::optional<webtransport::SendOrder>() + : queued_outgoing_data_streams_.rbegin()->first; + webtransport::SendOrder send_order = GetSendOrder(first_object); + // Zero out the subscriber priority bits, since these will be added when + // updating the session. + queued_outgoing_data_streams_.emplace( + UpdateSendOrderForSubscriberPriority(send_order, 0), first_object); + if (!start_send_order.has_value()) { + session_->UpdateQueuedSendOrder(subscription_id_, std::nullopt, send_order); + } else if (*start_send_order < send_order) { + session_->UpdateQueuedSendOrder( + subscription_id_, FinalizeSendOrder(*start_send_order), send_order); + } +} + +FullSequence +MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() { + QUICHE_DCHECK(!queued_outgoing_data_streams_.empty()); + if (queued_outgoing_data_streams_.empty()) { + return FullSequence(); + } + auto it = queued_outgoing_data_streams_.rbegin(); + webtransport::SendOrder old_send_order = FinalizeSendOrder(it->first); + FullSequence first_object = it->second; + // converting a reverse iterator to an iterator involves incrementing it and + // then taking base(). + queued_outgoing_data_streams_.erase((++it).base()); + if (queued_outgoing_data_streams_.empty()) { + session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, + std::nullopt); + } else { + webtransport::SendOrder new_send_order = + FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first); + if (old_send_order != new_send_order) { + session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, + new_send_order); + } + } + return first_object; +} + void MoqtSession::PublishedSubscription::OnDataStreamCreated( webtransport::StreamId id, FullSequence start_sequence) { stream_map().AddStream(start_sequence, id); @@ -1023,11 +1140,10 @@ MoqtSession::OutgoingDataStream::OutgoingDataStream( MoqtSession* session, webtransport::Stream* stream, - uint64_t subscription_id, PublishedSubscription& subscription, - FullSequence first_object) + PublishedSubscription& subscription, FullSequence first_object) : session_(session), stream_(stream), - subscription_id_(subscription_id), + subscription_id_(subscription.subscription_id()), next_object_(first_object), session_liveness_(session->liveness_token_) { UpdateSendOrder(subscription); @@ -1209,41 +1325,9 @@ void MoqtSession::OutgoingDataStream::UpdateSendOrder( PublishedSubscription& subscription) { - MoqtTrackPublisher& publisher = subscription.publisher(); - MoqtForwardingPreference forwarding_preference = - publisher.GetForwardingPreference(); - - // Use `next_object_` here since the priority-relevant sequence numbers never - // change for a given stream. - FullSequence sequence = next_object_; - - MoqtPriority subscriber_priority = subscription.subscriber_priority(); - MoqtPriority publisher_priority = publisher.GetPublisherPriority(); - MoqtDeliveryOrder delivery_order = - subscription.subscriber_delivery_order().value_or( - publisher.GetDeliveryOrder()); - webtransport::SendOrder send_order; - switch (forwarding_preference) { - case MoqtForwardingPreference::kTrack: - send_order = SendOrderForStream(subscriber_priority, publisher_priority, - /*group_id=*/0, delivery_order); - break; - case MoqtForwardingPreference::kGroup: - send_order = SendOrderForStream(subscriber_priority, publisher_priority, - sequence.group, delivery_order); - break; - case MoqtForwardingPreference::kObject: - send_order = - SendOrderForStream(subscriber_priority, publisher_priority, - sequence.group, sequence.object, delivery_order); - break; - case MoqtForwardingPreference::kDatagram: - QUICHE_NOTREACHED(); - return; - } stream_->SetPriority( webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId, - /*send_order=*/send_order}); + subscription.GetSendOrder(next_object_)}); } } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 8ccbdd6..e5a90f5 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -12,6 +12,8 @@ #include <utility> #include <vector> +#include "absl/container/btree_map.h" +#include "absl/container/btree_set.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" #include "absl/strings/string_view.h" @@ -27,7 +29,6 @@ #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_callbacks.h" -#include "quiche/common/quiche_circular_deque.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -66,6 +67,13 @@ DefaultIncomingAnnounceCallback; }; +struct SubscriptionWithQueuedStream { + webtransport::SendOrder send_order; + uint64_t subscription_id; + + auto operator<=>(const SubscriptionWithQueuedStream& other) const = default; +}; + // MoqtPublishingMonitorInterface allows a publisher monitor the delivery // progress for a single individual subscriber. class MoqtPublishingMonitorInterface { @@ -154,6 +162,15 @@ void Close() { session_->CloseSession(0, "Application closed"); } + // Tells the session that the highest send order for pending streams in a + // subscription has changed. If |old_send_order| is nullopt, this is the + // first pending stream. If |new_send_order| is nullopt, the subscription + // has no pending streams anymore. + void UpdateQueuedSendOrder( + uint64_t subscribe_id, + std::optional<webtransport::SendOrder> old_send_order, + std::optional<webtransport::SendOrder> new_send_order); + private: friend class test::MoqtSessionPeer; @@ -329,6 +346,7 @@ PublishedSubscription& operator=(const PublishedSubscription&) = delete; PublishedSubscription& operator=(PublishedSubscription&&) = delete; + uint64_t subscription_id() const { return subscription_id_; } MoqtTrackPublisher& publisher() { return *track_publisher_; } uint64_t track_alias() const { return track_alias_; } std::optional<FullSequence> largest_sent() const { return largest_sent_; } @@ -336,6 +354,7 @@ std::optional<MoqtDeliveryOrder> subscriber_delivery_order() const { return subscriber_delivery_order_; } + void set_subscriber_priority(MoqtPriority priority); void OnNewObjectAvailable(FullSequence sequence) override; void ProcessObjectAck(const MoqtObjectAck& message) { @@ -367,6 +386,15 @@ std::vector<webtransport::StreamId> GetAllStreams() const; + webtransport::SendOrder GetSendOrder(FullSequence sequence) const; + + void AddQueuedOutgoingDataStream(FullSequence first_object); + // Pops the pending outgoing data stream, with the highest send order. + // The session keeps track of which subscribes have pending streams. This + // function will trigger a QUICHE_DCHECK if called when there are no pending + // streams. + FullSequence NextQueuedOutgoingDataStream(); + private: SendStreamMap& stream_map(); quic::Perspective perspective() const { @@ -374,6 +402,11 @@ } void SendDatagram(FullSequence sequence); + webtransport::SendOrder FinalizeSendOrder( + webtransport::SendOrder send_order) { + return UpdateSendOrderForSubscriberPriority(send_order, + subscriber_priority_); + } uint64_t subscription_id_; MoqtSession* session_; @@ -387,11 +420,15 @@ std::optional<FullSequence> largest_sent_; // Should be almost always accessed via `stream_map()`. std::optional<SendStreamMap> lazily_initialized_stream_map_; + // Store the send order of queued outgoing data streams. Use a + // subscriber_priority_ of zero to avoid having to update it, and call + // FinalizeSendOrder() whenever delivering it to the MoqtSession.d + absl::btree_multimap<webtransport::SendOrder, FullSequence> + queued_outgoing_data_streams_; }; class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor { public: OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream, - uint64_t subscription_id, PublishedSubscription& subscription, FullSequence first_object); ~OutgoingDataStream(); @@ -436,14 +473,6 @@ // Private members of MoqtSession. - // QueuedOutgoingDataStream records an information necessary to create a - // stream that was attempted to be created before but was blocked due to flow - // control. - struct QueuedOutgoingDataStream { - uint64_t subscription_id; - FullSequence first_object; - }; - // Returns true if SUBSCRIBE_DONE was sent. bool SubscribeIsDone(uint64_t subscribe_id, SubscribeDoneCode code, absl::string_view reason_phrase); @@ -462,7 +491,7 @@ FullSequence first_object); // Same as above, except the session is required to be not flow control // blocked. - webtransport::Stream* OpenDataStream(uint64_t subscription_id, + webtransport::Stream* OpenDataStream(PublishedSubscription& subscription, FullSequence first_object); // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns @@ -513,10 +542,9 @@ // Subscriptions for local tracks by the remote peer, indexed by subscribe ID. absl::flat_hash_map<uint64_t, std::unique_ptr<PublishedSubscription>> published_subscriptions_; - // Keeps track of all the data streams that were supposed to be open, but were - // blocked by the flow control. - quiche::QuicheCircularDeque<QueuedOutgoingDataStream> - queued_outgoing_data_streams_; + // Keeps track of all subscribe IDs that have queued outgoing data streams. + absl::btree_set<SubscriptionWithQueuedStream> + subscribes_with_queued_outgoing_data_streams_; // This is only used to check for track_alias collisions. absl::flat_hash_set<uint64_t> used_track_aliases_; uint64_t next_local_track_alias_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 9547cf4..a129a9d 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -138,6 +138,17 @@ return session->published_subscriptions_[subscribe_id].get(); } + static void DeleteSubscription(MoqtSession* session, uint64_t subscribe_id) { + session->published_subscriptions_.erase(subscribe_id); + } + + static void UpdateSubscriberPriority(MoqtSession* session, + uint64_t subscribe_id, + MoqtPriority priority) { + session->published_subscriptions_[subscribe_id]->set_subscriber_priority( + priority); + } + static void set_peer_role(MoqtSession* session, MoqtRole role) { session->peer_role_ = role; } @@ -1246,6 +1257,221 @@ stream_input->OnAnnounceMessage(announce); } +TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) { + FullTrackName ftn("foo", "bar"); + auto track = + SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); + MoqtObjectListener* subscription = + MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0); + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); + subscription->OnNewObjectAvailable(FullSequence(1, 0)); + subscription->OnNewObjectAvailable(FullSequence(0, 0)); + subscription->OnNewObjectAvailable(FullSequence(2, 0)); + // These should be opened in the sequence (0, 0), (1, 0), (2, 0). + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillRepeatedly(Return(true)); + webtransport::test::MockStream mock_stream0, mock_stream1, mock_stream2; + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&mock_stream0)) + .WillOnce(Return(&mock_stream1)) + .WillOnce(Return(&mock_stream2)); + std::unique_ptr<webtransport::StreamVisitor> stream_visitor[3]; + EXPECT_CALL(mock_stream0, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor[0] = std::move(visitor); + }); + EXPECT_CALL(mock_stream1, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor[1] = std::move(visitor); + }); + EXPECT_CALL(mock_stream2, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor[2] = std::move(visitor); + }); + EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0)); + EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1)); + EXPECT_CALL(mock_stream2, GetStreamId()).WillRepeatedly(Return(2)); + EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() { + return stream_visitor[0].get(); + }); + EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() { + return stream_visitor[1].get(); + }); + EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() { + return stream_visitor[2].get(); + }); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) + .WillOnce( + Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 0))) + .WillOnce( + Return(PublishedObject{FullSequence(1, 0), MoqtObjectStatus::kNormal, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 1))) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 0))) + .WillOnce( + Return(PublishedObject{FullSequence(2, 0), MoqtObjectStatus::kNormal, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 1))) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream0, Writev(_, _)) + .WillOnce([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + // The Group ID is the 5th byte of the stream. + EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); + return absl::OkStatus(); + }); + EXPECT_CALL(mock_stream1, Writev(_, _)) + .WillOnce([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + // The Group ID is the 5th byte of the stream. + EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 1); + return absl::OkStatus(); + }); + EXPECT_CALL(mock_stream2, Writev(_, _)) + .WillOnce([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + // The Group ID is the 5th byte of the stream. + EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 2); + return absl::OkStatus(); + }); + session_.OnCanCreateNewOutgoingUnidirectionalStream(); +} + +TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) { + FullTrackName ftn("foo", "bar"); + auto track = + SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); + MoqtObjectListener* subscription = + MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0); + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(false)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); + subscription->OnNewObjectAvailable(FullSequence(0, 0)); + + // Delete the subscription, then grant stream credit. + MoqtSessionPeer::DeleteSubscription(&session_, 0); + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillRepeatedly(Return(true)); + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()).Times(0); + session_.OnCanCreateNewOutgoingUnidirectionalStream(); +} + +TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) { + FullTrackName ftn("foo", "bar"); + auto track = + SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); + // Create two identical subscriptions with different priorities. + MoqtObjectListener* subscription0 = + MoqtSessionPeer::AddSubscription(&session_, track, 0, 14, 0, 0); + MoqtObjectListener* subscription1 = + MoqtSessionPeer::AddSubscription(&session_, track, 1, 14, 0, 0); + MoqtSessionPeer::UpdateSubscriberPriority(&session_, 0, 1); + MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 2); + + // Two arriving objects will queue four streams. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)); + EXPECT_CALL(*track, GetTrackStatus()) + .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); + subscription0->OnNewObjectAvailable(FullSequence(0, 0)); + subscription1->OnNewObjectAvailable(FullSequence(0, 0)); + subscription0->OnNewObjectAvailable(FullSequence(1, 0)); + subscription1->OnNewObjectAvailable(FullSequence(1, 0)); + + // Allow one stream to be opened. It will be group 0, subscription 0. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + webtransport::test::MockStream mock_stream0; + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&mock_stream0)); + std::unique_ptr<webtransport::StreamVisitor> stream_visitor0; + EXPECT_CALL(mock_stream0, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor0 = std::move(visitor); + }); + EXPECT_CALL(mock_stream0, GetStreamId()).WillRepeatedly(Return(0)); + EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() { + return stream_visitor0.get(); + }); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) + .WillOnce( + Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream0, Writev(_, _)) + .WillOnce([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + // Check subscribe ID is 0. + EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0); + // Check Group ID is 0 + EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); + return absl::OkStatus(); + }); + session_.OnCanCreateNewOutgoingUnidirectionalStream(); + + // Raise the priority of subscription 1 and allow another stream. It will be + // group 0, subscription 1. + MoqtSessionPeer::UpdateSubscriberPriority(&session_, 1, 0); + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(true)) + .WillRepeatedly(Return(false)); + webtransport::test::MockStream mock_stream1; + EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) + .WillOnce(Return(&mock_stream1)); + std::unique_ptr<webtransport::StreamVisitor> stream_visitor1; + EXPECT_CALL(mock_stream1, SetVisitor(_)) + .WillOnce([&](std::unique_ptr<webtransport::StreamVisitor> visitor) { + stream_visitor1 = std::move(visitor); + }); + EXPECT_CALL(mock_stream1, GetStreamId()).WillRepeatedly(Return(1)); + EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() { + return stream_visitor1.get(); + }); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) + .WillOnce( + Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) + .WillOnce(Return(std::nullopt)); + EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); + EXPECT_CALL(mock_stream1, Writev(_, _)) + .WillOnce([&](absl::Span<const absl::string_view> data, + const quiche::StreamWriteOptions& options) { + // Check subscribe ID is 0. + EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1); + // Check Group ID is 0 + EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0); + return absl::OkStatus(); + }); + session_.OnCanCreateNewOutgoingUnidirectionalStream(); +} + // TODO: re-enable this test once this behavior is re-implemented. #if 0 TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) {