| // Copyright 2024 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "quiche/quic/moqt/moqt_outgoing_queue.h" | 
 |  | 
 | #include <cstdint> | 
 | #include <memory> | 
 | #include <optional> | 
 | #include <string> | 
 | #include <utility> | 
 | #include <vector> | 
 |  | 
 | #include "absl/status/status.h" | 
 | #include "absl/status/statusor.h" | 
 | #include "absl/strings/string_view.h" | 
 | #include "quiche/quic/core/quic_default_clock.h" | 
 | #include "quiche/quic/core/quic_time.h" | 
 | #include "quiche/quic/moqt/moqt_messages.h" | 
 | #include "quiche/quic/moqt/moqt_priority.h" | 
 | #include "quiche/quic/moqt/moqt_publisher.h" | 
 | #include "quiche/quic/moqt/moqt_subscribe_windows.h" | 
 | #include "quiche/quic/test_tools/quic_test_utils.h" | 
 | #include "quiche/common/platform/api/quiche_expect_bug.h" | 
 | #include "quiche/common/platform/api/quiche_logging.h" | 
 | #include "quiche/common/platform/api/quiche_test.h" | 
 | #include "quiche/common/test_tools/quiche_test_utils.h" | 
 | #include "quiche/web_transport/web_transport.h" | 
 |  | 
 | namespace moqt { | 
 | namespace { | 
 |  | 
 | using ::quic::test::MemSliceFromString; | 
 | using ::quiche::test::IsOkAndHolds; | 
 | using ::quiche::test::StatusIs; | 
 | using ::testing::AnyOf; | 
 | using ::testing::ElementsAre; | 
 | using ::testing::IsEmpty; | 
 |  | 
 | class TestMoqtOutgoingQueue : public MoqtOutgoingQueue, | 
 |                               public MoqtObjectListener { | 
 |  public: | 
 |   TestMoqtOutgoingQueue() | 
 |       : MoqtOutgoingQueue(FullTrackName{"test", "track"}, | 
 |                           MoqtForwardingPreference::kSubgroup) { | 
 |     AddObjectListener(this); | 
 |   } | 
 |  | 
 |   void OnNewObjectAvailable(Location sequence) override { | 
 |     std::optional<PublishedObject> object = GetCachedObject(sequence); | 
 |     QUICHE_CHECK(object.has_value()); | 
 |     ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal, | 
 |                                       MoqtObjectStatus::kEndOfGroup)); | 
 |     if (object->status == MoqtObjectStatus::kNormal) { | 
 |       PublishObject(object->sequence.group, object->sequence.object, | 
 |                     object->payload.AsStringView()); | 
 |     } else { | 
 |       CloseStreamForGroup(object->sequence.group); | 
 |     } | 
 |   } | 
 |  | 
 |   void GetObjectsFromPast(const SubscribeWindow& window) { | 
 |     std::vector<Location> objects = | 
 |         GetCachedObjectsInRange(Location(0, 0), GetLargestLocation()); | 
 |     for (Location object : objects) { | 
 |       if (window.InWindow(object)) { | 
 |         OnNewObjectAvailable(object); | 
 |       } | 
 |     } | 
 |   } | 
 |  | 
 |   MOCK_METHOD(void, OnNewFinAvailable, (Location sequence)); | 
 |   MOCK_METHOD(void, OnSubgroupAbandoned, | 
 |               (Location sequence, webtransport::StreamErrorCode error_code)); | 
 |   MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id)); | 
 |   MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ()); | 
 |   MOCK_METHOD(void, PublishObject, | 
 |               (uint64_t group_id, uint64_t object_id, | 
 |                absl::string_view payload), | 
 |               ()); | 
 |   MOCK_METHOD(void, OnTrackPublisherGone, (), (override)); | 
 |   MOCK_METHOD(void, OnSubscribeAccepted, (), (override)); | 
 |   MOCK_METHOD(void, OnSubscribeRejected, | 
 |               (MoqtSubscribeErrorReason reason, | 
 |                std::optional<uint64_t> track_alias), | 
 |               (override)); | 
 | }; | 
 |  | 
 | absl::StatusOr<std::vector<std::string>> FetchToVector( | 
 |     std::unique_ptr<MoqtFetchTask> fetch) { | 
 |   std::vector<std::string> objects; | 
 |   for (;;) { | 
 |     PublishedObject object; | 
 |     MoqtFetchTask::GetNextObjectResult result = fetch->GetNextObject(object); | 
 |     switch (result) { | 
 |       case MoqtFetchTask::kSuccess: | 
 |         if (object.status == MoqtObjectStatus::kNormal) { | 
 |           objects.emplace_back(object.payload.AsStringView()); | 
 |         } else { | 
 |           EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup); | 
 |         } | 
 |         continue; | 
 |       case MoqtFetchTask::kPending: | 
 |         return absl::InternalError( | 
 |             "Unexpected kPending from MoqtOutgoingQueue"); | 
 |       case MoqtFetchTask::kEof: | 
 |         return objects; | 
 |       case MoqtFetchTask::kError: | 
 |         return fetch->GetStatus(); | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, FirstObjectNotKeyframe) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   EXPECT_QUICHE_BUG(queue.AddObject(MemSliceFromString("a"), false), | 
 |                     "The first object"); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, SingleGroup) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), false); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromZero) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |  | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), false); | 
 |   queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0))); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |  | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), false); | 
 |   queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, TwoGroups) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(0)); | 
 |     EXPECT_CALL(queue, PublishObject(1, 0, "d")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 1, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 2, "f")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), false); | 
 |   queue.AddObject(MemSliceFromString("d"), true); | 
 |   queue.AddObject(MemSliceFromString("e"), false); | 
 |   queue.AddObject(MemSliceFromString("f"), false); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, TwoGroupsPastSubscribe) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(0)); | 
 |     EXPECT_CALL(queue, PublishObject(1, 0, "d")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 1, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 2, "f")); | 
 |  | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 2, "c")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(0)); | 
 |     EXPECT_CALL(queue, PublishObject(1, 0, "d")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 1, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 2, "f")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), false); | 
 |   queue.AddObject(MemSliceFromString("d"), true); | 
 |   queue.AddObject(MemSliceFromString("e"), false); | 
 |   queue.AddObject(MemSliceFromString("f"), false); | 
 |   queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, FiveGroups) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(0)); | 
 |     EXPECT_CALL(queue, PublishObject(1, 0, "c")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 1, "d")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(1)); | 
 |     EXPECT_CALL(queue, PublishObject(2, 0, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(2, 1, "f")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(2)); | 
 |     EXPECT_CALL(queue, PublishObject(3, 0, "g")); | 
 |     EXPECT_CALL(queue, PublishObject(3, 1, "h")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(3)); | 
 |     EXPECT_CALL(queue, PublishObject(4, 0, "i")); | 
 |     EXPECT_CALL(queue, PublishObject(4, 1, "j")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), true); | 
 |   queue.AddObject(MemSliceFromString("d"), false); | 
 |   queue.AddObject(MemSliceFromString("e"), true); | 
 |   queue.AddObject(MemSliceFromString("f"), false); | 
 |   queue.AddObject(MemSliceFromString("g"), true); | 
 |   queue.AddObject(MemSliceFromString("h"), false); | 
 |   queue.AddObject(MemSliceFromString("i"), true); | 
 |   queue.AddObject(MemSliceFromString("j"), false); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, FiveGroupsPastSubscribe) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   { | 
 |     testing::InSequence seq; | 
 |     EXPECT_CALL(queue, PublishObject(0, 0, "a")); | 
 |     EXPECT_CALL(queue, PublishObject(0, 1, "b")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(0)); | 
 |     EXPECT_CALL(queue, PublishObject(1, 0, "c")); | 
 |     EXPECT_CALL(queue, PublishObject(1, 1, "d")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(1)); | 
 |     EXPECT_CALL(queue, PublishObject(2, 0, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(2, 1, "f")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(2)); | 
 |     EXPECT_CALL(queue, PublishObject(3, 0, "g")); | 
 |     EXPECT_CALL(queue, PublishObject(3, 1, "h")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(3)); | 
 |     EXPECT_CALL(queue, PublishObject(4, 0, "i")); | 
 |     EXPECT_CALL(queue, PublishObject(4, 1, "j")); | 
 |  | 
 |     // Past SUBSCRIBE would only get the three most recent groups. | 
 |     EXPECT_CALL(queue, PublishObject(2, 0, "e")); | 
 |     EXPECT_CALL(queue, PublishObject(2, 1, "f")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(2)); | 
 |     EXPECT_CALL(queue, PublishObject(3, 0, "g")); | 
 |     EXPECT_CALL(queue, PublishObject(3, 1, "h")); | 
 |     EXPECT_CALL(queue, CloseStreamForGroup(3)); | 
 |     EXPECT_CALL(queue, PublishObject(4, 0, "i")); | 
 |     EXPECT_CALL(queue, PublishObject(4, 1, "j")); | 
 |   } | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), true); | 
 |   queue.AddObject(MemSliceFromString("d"), false); | 
 |   queue.AddObject(MemSliceFromString("e"), true); | 
 |   queue.AddObject(MemSliceFromString("f"), false); | 
 |   queue.AddObject(MemSliceFromString("g"), true); | 
 |   queue.AddObject(MemSliceFromString("h"), false); | 
 |   queue.AddObject(MemSliceFromString("i"), true); | 
 |   queue.AddObject(MemSliceFromString("j"), false); | 
 |   queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0))); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, Fetch) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               StatusIs(absl::StatusCode::kNotFound)); | 
 |  | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), false); | 
 |   queue.AddObject(MemSliceFromString("c"), true); | 
 |   queue.AddObject(MemSliceFromString("d"), false); | 
 |   queue.AddObject(MemSliceFromString("e"), true); | 
 |  | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("a", "b", "c", "d", "e"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 100}, 0, 1000, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(IsEmpty())); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0, | 
 |                                         MoqtDeliveryOrder::kDescending)), | 
 |               IsOkAndHolds(ElementsAre("e", "c", "d", "a", "b"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 1, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("a", "b", "c"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 1, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("a", "b", "c"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{1, 0}, 5, std::nullopt, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("c", "d", "e"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{3, 0}, 5, std::nullopt, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               StatusIs(absl::StatusCode::kNotFound)); | 
 |  | 
 |   queue.AddObject(MemSliceFromString("f"), true); | 
 |   queue.AddObject(MemSliceFromString("g"), false); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 0, 1, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               StatusIs(absl::StatusCode::kNotFound)); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("c", "d", "e"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{1, 0}, 5, std::nullopt, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("c", "d", "e", "f", "g"))); | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{3, 0}, 5, std::nullopt, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("f", "g"))); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, ObjectsGoneWhileFetching) { | 
 |   TestMoqtOutgoingQueue queue; | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   queue.AddObject(MemSliceFromString("b"), true); | 
 |   queue.AddObject(MemSliceFromString("c"), true); | 
 |   queue.AddObject(MemSliceFromString("d"), true); | 
 |   queue.AddObject(MemSliceFromString("e"), true); | 
 |  | 
 |   EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 5, 0, | 
 |                                         MoqtDeliveryOrder::kAscending)), | 
 |               IsOkAndHolds(ElementsAre("c", "d", "e"))); | 
 |   std::unique_ptr<MoqtFetchTask> deferred_fetch = | 
 |       queue.Fetch(Location{0, 0}, 5, 0, MoqtDeliveryOrder::kAscending); | 
 |  | 
 |   queue.AddObject(MemSliceFromString("f"), true); | 
 |   queue.AddObject(MemSliceFromString("g"), true); | 
 |   queue.AddObject(MemSliceFromString("h"), true); | 
 |   queue.AddObject(MemSliceFromString("i"), true); | 
 |  | 
 |   EXPECT_THAT(FetchToVector(std::move(deferred_fetch)), | 
 |               IsOkAndHolds(IsEmpty())); | 
 | } | 
 |  | 
 | TEST(MoqtOutgoingQueue, ObjectIsTimestamped) { | 
 |   quic::QuicDefaultClock* clock = quic::QuicDefaultClock::Get(); | 
 |   quic::QuicTime test_start = clock->ApproximateNow(); | 
 |   TestMoqtOutgoingQueue queue; | 
 |   queue.AddObject(MemSliceFromString("a"), true); | 
 |   std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0}); | 
 |   ASSERT_TRUE(object.has_value()); | 
 |   EXPECT_GE(object->arrival_time, test_start); | 
 | } | 
 |  | 
 | }  // namespace | 
 | }  // namespace moqt |