Limit Joining FETCH to largest_object at time of SUBSCRIBE. Currently, MoqtOutgoingQueue (our only TrackPublisher implementation that supports FETCH) uses the current largest object for Joining Fetch, instead of the LargestObject at the time of SUBSCRIBE. This change moves the responsibility to the session, which knows what the original LargestObject is. MoqtPublisher::RelativeFetch() and AbsouluteFetch() are now only called if the SUBSCRIBE is pending. Otherwise, the session calls for StandaloneFetch. For MoqtOutgoingQueue, the SUBSCRIBE is never pending so this is always an error. This is a prerequisite to implementing the Joining FETCH aspects of REWIND. PiperOrigin-RevId: 899551017
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 00cbc1c..b528528 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -158,45 +158,19 @@ } std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::RelativeFetch( - uint64_t group_diff, MoqtDeliveryOrder order) { - if (queue_.empty()) { - return std::make_unique<MoqtFailedFetch>( - absl::NotFoundError("No objects available on the track")); - } - - uint64_t start_group = (group_diff > first_group_in_queue()) - ? 0 - : current_group_id_ - group_diff; - start_group = std::max(start_group, first_group_in_queue()); - Location start = Location(start_group, 0); - Location end = Location(current_group_id_, queue_.back().size() - 1); - - std::vector<Location> objects = GetCachedObjectsInRange(start, end); - if (order == MoqtDeliveryOrder::kDescending) { - ObjectsInDescendingOrder(objects); - } - return std::make_unique<FetchTask>(this, std::move(objects)); + uint64_t /*group_diff*/, MoqtDeliveryOrder /*order*/) { + QUICHE_BUG(MoqtOutgoingQueue_RelativeFetch) + << "Calling RelativeFetch() on an established subscription"; + return std::make_unique<MoqtFailedFetch>(absl::InternalError( + "RelativeFetch called on an established subscription")); } std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::AbsoluteFetch( - uint64_t group, MoqtDeliveryOrder order) { - if (queue_.empty()) { - return std::make_unique<MoqtFailedFetch>( - absl::NotFoundError("No objects available on the track")); - } - - Location start(std::max(group, first_group_in_queue()), 0); - Location end = Location(current_group_id_, queue_.back().size() - 1); - if (start > end) { - return std::make_unique<MoqtFailedFetch>( - absl::NotFoundError("All of the requested objects are in the future")); - } - - std::vector<Location> objects = GetCachedObjectsInRange(start, end); - if (order == MoqtDeliveryOrder::kDescending) { - ObjectsInDescendingOrder(objects); - } - return std::make_unique<FetchTask>(this, std::move(objects)); + uint64_t /*group*/, MoqtDeliveryOrder /*order*/) { + QUICHE_BUG(MoqtOutgoingQueue_AbsoluteFetch) + << "Calling AbsoluteFetch() on an established subscription"; + return std::make_unique<MoqtFailedFetch>(absl::InternalError( + "AbsoluteFetch called on an established subscription")); } MoqtFetchTask::GetNextObjectResult MoqtOutgoingQueue::FetchTask::GetNextObject(
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index d1dca0f..2cc9309 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -76,6 +76,8 @@ std::unique_ptr<MoqtFetchTask> StandaloneFetch( Location start, Location end, MoqtDeliveryOrder order) override; + // Joining Fetch functions should never be called because subscriptions are + // never pending in MoqtOutgoingQueue. std::unique_ptr<MoqtFetchTask> RelativeFetch( uint64_t group_diff, MoqtDeliveryOrder order) override; std::unique_ptr<MoqtFetchTask> AbsoluteFetch(
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index dd79dcb..60df583 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -47,6 +47,7 @@ public MoqtObjectListener { public: TestMoqtOutgoingQueue() : MoqtOutgoingQueue(FullTrackName{"test", "track"}) { + EXPECT_CALL(*this, OnSubscribeAccepted).WillOnce(Return()); AddObjectListener(this); } @@ -364,40 +365,16 @@ TEST(MoqtOutgoingQueue, RelativeJoiningFetch) { TestMoqtOutgoingQueue queue; - queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); // 0, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("b"), true); // 1, 0 - // Request before group zero. - EXPECT_THAT( - FetchToVector(queue.RelativeFetch(4, MoqtDeliveryOrder::kDescending)), - IsOkAndHolds(ElementsAre("b", "a"))); - queue.AddObject(quiche::QuicheMemSlice::Copy("c"), true); // 2, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("d"), false); // 2, 1 - queue.AddObject(quiche::QuicheMemSlice::Copy("e"), true); // 3, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("f"), false); // 3, 1 - queue.AddObject(quiche::QuicheMemSlice::Copy("g"), true); // 4, 0 - // Early groups are already destroyed. - EXPECT_THAT( - FetchToVector(queue.RelativeFetch(4, MoqtDeliveryOrder::kDescending)), - IsOkAndHolds(ElementsAre("g", "e", "f", "c", "d"))); + EXPECT_QUICHE_BUG( + queue.RelativeFetch(1, MoqtDeliveryOrder::kAscending), + "Calling RelativeFetch\\(\\) on an established subscription"); } TEST(MoqtOutgoingQueue, AbsoluteJoiningFetch) { TestMoqtOutgoingQueue queue; - queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true); // 0, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("b"), true); // 1, 0 - // Request too far in the future - EXPECT_THAT( - FetchToVector(queue.AbsoluteFetch(4, MoqtDeliveryOrder::kDescending)), - StatusIs(absl::StatusCode::kNotFound)); - queue.AddObject(quiche::QuicheMemSlice::Copy("c"), true); // 2, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("d"), false); // 2, 1 - queue.AddObject(quiche::QuicheMemSlice::Copy("e"), true); // 3, 0 - queue.AddObject(quiche::QuicheMemSlice::Copy("f"), false); // 3, 1 - queue.AddObject(quiche::QuicheMemSlice::Copy("g"), true); // 4, 0 - // Early groups are already destroyed. - EXPECT_THAT( - FetchToVector(queue.AbsoluteFetch(1, MoqtDeliveryOrder::kDescending)), - IsOkAndHolds(ElementsAre("g", "e", "f", "c", "d"))); + EXPECT_QUICHE_BUG( + queue.AbsoluteFetch(1, MoqtDeliveryOrder::kAscending), + "Calling AbsoluteFetch\\(\\) on an established subscription"); } TEST(MoqtOutgoingQueue, ObjectsGoneWhileFetching) {
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 25d75ea..2d13655 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -102,9 +102,12 @@ virtual const TrackExtensions& extensions() const = 0; virtual std::optional<quic::QuicTimeDelta> expiration() const = 0; - // Performs a fetch for the specified range of objects. + // Performs a fetch for the specified range of objects. Should also be used + // for joining fetches where Largest Location is known. virtual std::unique_ptr<MoqtFetchTask> StandaloneFetch( Location start, Location end, MoqtDeliveryOrder order) = 0; + // Use only when the subscription is pending, so that Largest Location is + // unknown. virtual std::unique_ptr<MoqtFetchTask> RelativeFetch( uint64_t group_diff, MoqtDeliveryOrder order) = 0; virtual std::unique_ptr<MoqtFetchTask> AbsoluteFetch(
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index c69ac4d..69020ba 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -1487,22 +1487,50 @@ return; } track_name = it->second->publisher().GetTrackName(); - if (std::holds_alternative<JoiningFetchRelative>(message.fetch)) { - const JoiningFetchRelative& relative_fetch = - std::get<JoiningFetchRelative>(message.fetch); - QUIC_DLOG(INFO) << ENDPOINT << "Received a Relative Joining FETCH for " - << track_name; - fetch = it->second->publisher().RelativeFetch( - relative_fetch.joining_start, message.parameters.group_order.value_or( - MoqtDeliveryOrder::kAscending)); + if (it->second->established()) { + if (!it->second->parameters().largest_object.has_value()) { + // Nothing to Fetch. + SendRequestError(message.request_id, RequestErrorCode::kDoesNotExist, + std::nullopt, "not found"); + return; + } + const Location largest_location = + *it->second->parameters().largest_object; + uint64_t start_group; + if (std::holds_alternative<JoiningFetchRelative>(message.fetch)) { + const JoiningFetchRelative& relative_fetch = + std::get<JoiningFetchRelative>(message.fetch); + start_group = + (relative_fetch.joining_start > largest_location.group) + ? 0 + : (largest_location.group - relative_fetch.joining_start); + } else { + const JoiningFetchAbsolute& absolute_fetch = + std::get<JoiningFetchAbsolute>(message.fetch); + start_group = absolute_fetch.joining_start; + if (start_group > largest_location.group) { + SendRequestError(message.request_id, RequestErrorCode::kInvalidRange, + std::nullopt, "invalid range"); + return; + } + } + fetch = it->second->publisher().StandaloneFetch( + Location{start_group, 0}, largest_location, + message.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)); } else { - const JoiningFetchAbsolute& absolute_fetch = - std::get<JoiningFetchAbsolute>(message.fetch); - QUIC_DLOG(INFO) << ENDPOINT << "Received a Absolute Joining FETCH for " - << track_name; - fetch = it->second->publisher().AbsoluteFetch( - absolute_fetch.joining_start, message.parameters.group_order.value_or( - MoqtDeliveryOrder::kAscending)); + // Subscription is in PENDING state. + if (std::holds_alternative<JoiningFetchRelative>(message.fetch)) { + fetch = it->second->publisher().RelativeFetch( + std::get<JoiningFetchRelative>(message.fetch).joining_start, + message.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)); + } else { + fetch = it->second->publisher().AbsoluteFetch( + std::get<JoiningFetchAbsolute>(message.fetch).joining_start, + message.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)); + } } } if (!fetch->GetStatus().ok()) { @@ -1928,6 +1956,8 @@ void MoqtSession::PublishedSubscription::OnSubscribeAccepted() { ControlStream* stream = session_->GetControlStream(); + QUICHE_DCHECK(!established_); + established_ = true; parameters_.largest_object = track_publisher_->largest_location(); if (parameters_.subscription_filter.has_value()) { parameters_.subscription_filter->OnLargestObject(
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 8bb9060..70a00a7 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -466,6 +466,8 @@ return default_publisher_priority_.value_or(kDefaultPublisherPriority); } + bool established() const { return established_; } + private: friend class test::MoqtSessionPeer; SendStreamMap& stream_map(); @@ -484,6 +486,8 @@ MoqtSession* session_; std::shared_ptr<MoqtTrackPublisher> track_publisher_; uint64_t request_id_; + // Subscription is in the ESTABLISHED state. + bool established_ = false; bool can_have_joining_fetch_ = false; const uint64_t track_alias_; // These are (mostly) the parameters from the SUBSCRIBE message. However,
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 22c7d4b..48d3260 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -2666,7 +2666,7 @@ MoqtFetch fetch = DefaultFetch(); fetch.request_id = 3; fetch.fetch = JoiningFetchRelative(1, 2); - EXPECT_CALL(*track, RelativeFetch(2, _)) + EXPECT_CALL(*track, StandaloneFetch(Location(2, 0), Location(4, 10), _)) .WillOnce(Return(std::make_unique<MockFetchTask>())); stream_input->OnFetchMessage(fetch); } @@ -2693,7 +2693,7 @@ MoqtFetch fetch = DefaultFetch(); fetch.request_id = 3; fetch.fetch = JoiningFetchAbsolute(1, 2); - EXPECT_CALL(*track, AbsoluteFetch(2, _)) + EXPECT_CALL(*track, StandaloneFetch(Location(2, 0), Location(4, 10), _)) .WillOnce(Return(std::make_unique<MockFetchTask>())); stream_input->OnFetchMessage(fetch); }