Update MoqtLiveRelayQueue for Peeps. Places incoming objects in subgroup queues and delivers them in subgroup order. PiperOrigin-RevId: 688128812
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_cached_object.cc index 395875b..332edb4 100644 --- a/quiche/quic/moqt/moqt_cached_object.cc +++ b/quiche/quic/moqt/moqt_cached_object.cc
@@ -14,6 +14,7 @@ PublishedObject result; result.sequence = object.sequence; result.status = object.status; + result.publisher_priority = object.publisher_priority; if (object.payload != nullptr && !object.payload->empty()) { result.payload = quiche::QuicheMemSlice( object.payload->data(), object.payload->length(),
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h index c04b31e..be556e2 100644 --- a/quiche/quic/moqt/moqt_cached_object.h +++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -8,6 +8,7 @@ #include <memory> #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_mem_slice.h" @@ -18,6 +19,7 @@ struct CachedObject { FullSequence sequence; MoqtObjectStatus status; + MoqtPriority publisher_priority; std::shared_ptr<quiche::QuicheMemSlice> payload; };
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index d398331..75eafe6 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -204,16 +204,15 @@ [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {}); bool received_object = false; - EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _, _)) + .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence, MoqtPriority /*publisher_priority*/, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view object, bool end_of_message) { EXPECT_EQ(full_track_name, FullTrackName("test", "data")); - EXPECT_EQ(group_sequence, 0u); - EXPECT_EQ(object_sequence, 0u); + EXPECT_EQ(sequence.group, 0u); + EXPECT_EQ(sequence.object, 0u); EXPECT_EQ(status, MoqtObjectStatus::kNormal); EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup); EXPECT_EQ(object, "object data"); @@ -250,13 +249,13 @@ client_->session()->SubscribeCurrentGroup(FullTrackName("test", name), &client_visitor); int received = 0; - EXPECT_CALL(client_visitor, - OnObjectFragment(_, 1, 0, _, MoqtObjectStatus::kNormal, _, - "object 4", true)) + EXPECT_CALL(client_visitor, OnObjectFragment(_, FullSequence{1, 0}, _, + MoqtObjectStatus::kNormal, _, + "object 4", true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, 1, 1, _, MoqtObjectStatus::kNormal, _, - "object 5", true)) + EXPECT_CALL(client_visitor, OnObjectFragment(_, FullSequence{1, 1}, _, + MoqtObjectStatus::kNormal, _, + "object 5", true)) .WillOnce([&] { ++received; }); bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received >= 2; }); @@ -265,21 +264,21 @@ queue->AddObject(MemSliceFromString("object 6"), /*key=*/false); queue->AddObject(MemSliceFromString("object 7"), /*key=*/true); queue->AddObject(MemSliceFromString("object 8"), /*key=*/false); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, 1, 2, _, MoqtObjectStatus::kNormal, _, - "object 6", true)) + EXPECT_CALL(client_visitor, OnObjectFragment(_, FullSequence{1, 2}, _, + MoqtObjectStatus::kNormal, _, + "object 6", true)) .WillOnce([&] { ++received; }); EXPECT_CALL(client_visitor, - OnObjectFragment(_, 1, 3, _, MoqtObjectStatus::kEndOfGroup, _, - "", true)) + OnObjectFragment(_, FullSequence{1, 3}, _, + MoqtObjectStatus::kEndOfGroup, _, "", true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, 2, 0, _, MoqtObjectStatus::kNormal, _, - "object 7", true)) + EXPECT_CALL(client_visitor, OnObjectFragment(_, FullSequence{2, 0}, _, + MoqtObjectStatus::kNormal, _, + "object 7", true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, 2, 1, _, MoqtObjectStatus::kNormal, _, - "object 8", true)) + EXPECT_CALL(client_visitor, OnObjectFragment(_, FullSequence{2, 1}, _, + MoqtObjectStatus::kNormal, _, + "object 8", true)) .WillOnce([&] { ++received; }); success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received >= 6; }); @@ -310,26 +309,36 @@ &client_visitor); int received = 0; // Those won't arrive since they have expired. - EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{0, 0}, _, _, _, _, true)) .Times(0); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 0, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{0, 0}, _, _, _, _, true)) .Times(0); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{96, 0}, _, _, _, _, true)) .Times(0); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 96, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{96, 0}, _, _, _, _, true)) .Times(0); // Those are within the "last three groups" window. - EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{97, 0}, _, _, _, _, true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 97, 1, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{97, 1}, _, _, _, _, true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{98, 0}, _, _, _, _, true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 98, 1, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{98, 1}, _, _, _, _, true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 0, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{99, 0}, _, _, _, _, true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, OnObjectFragment(_, 99, 1, _, _, _, _, true)) + EXPECT_CALL(client_visitor, + OnObjectFragment(_, FullSequence{99, 1}, _, _, _, _, true)) .Times(0); // The current group should not be closed yet. bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received >= 5; });
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc index 6cee208..3ca2532 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -4,16 +4,15 @@ #include "quiche/quic/moqt/moqt_live_relay_queue.h" -#include <cstdint> #include <memory> #include <optional> -#include <tuple> #include <vector> #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -23,42 +22,22 @@ namespace moqt { -// TODO(martinduke): Accept subgroup ID. -// TODO(martinduke): Accept publisher priority. // TODO(martinduke): Unless Track Forwarding preference goes away, support it. -bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id, - MoqtObjectStatus status, - absl::string_view object) { +bool MoqtLiveRelayQueue::AddRawObject(FullSequence sequence, + MoqtObjectStatus status, + MoqtPriority priority, + absl::string_view payload) { if (queue_.size() == kMaxQueuedGroups) { - if (queue_.begin()->first > group_id) { - QUICHE_DLOG(INFO) << "Skipping object from group " << group_id + if (queue_.begin()->first > sequence.group) { + QUICHE_DLOG(INFO) << "Skipping object from group " << sequence.group << " because it is too old."; return true; } - if (queue_.find(group_id) == queue_.end()) { + if (queue_.find(sequence.group) == queue_.end()) { // Erase the oldest group. queue_.erase(queue_.begin()); } } - QUICHE_CHECK(status == MoqtObjectStatus::kNormal || object.empty()); - return AddRawObject(FullSequence{group_id, object_id}, status, object); -} - -std::tuple<uint64_t, bool> MoqtLiveRelayQueue::NextObject(Group& group) const { - auto it = group.rbegin(); - if (it == group.rend()) { - return std::tuple<uint64_t, bool>(0, false); - } - return std::tuple<uint64_t, bool>( - it->second.sequence.object + 1, - (it->second.status == MoqtObjectStatus::kEndOfGroup || - it->second.status == MoqtObjectStatus::kGroupDoesNotExist || - it->second.status == MoqtObjectStatus::kEndOfTrack)); -} - -bool MoqtLiveRelayQueue::AddRawObject(FullSequence sequence, - MoqtObjectStatus status, - absl::string_view payload) { // Validate the input given previously received markers. if (end_of_track_.has_value() && sequence > *end_of_track_) { QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the " @@ -79,34 +58,61 @@ return false; } auto group_it = queue_.try_emplace(sequence.group); + Group& group = group_it.first->second; if (!group_it.second) { // Group already exists. - auto [next_object_id, is_the_end] = NextObject(group_it.first->second); - if (next_object_id <= sequence.object && is_the_end) { + if (group.complete && sequence.object >= group.next_object) { QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the " << "group"; return false; } if (status == MoqtObjectStatus::kEndOfGroup && - sequence.object < next_object_id) { + sequence.object < group.next_object) { QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last " << "object in the group."; return false; } } + auto subgroup_it = group.subgroups.try_emplace( + SubgroupPriority{priority, sequence.subgroup}); + auto& object_queue = subgroup_it.first->second; + if (!object_queue.empty()) { // Check if the new object is valid + auto last_object = object_queue.rbegin(); + if (last_object->first >= sequence.object) { + QUICHE_DLOG(INFO) << "Skipping object because it does not increase the " + << "object ID monotonically in the subgroup."; + return false; + } + if (last_object->second.status == MoqtObjectStatus::kEndOfSubgroup) { + QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the " + << "subgroup."; + return false; + } + } + // Object is valid. Update state. if (next_sequence_ <= sequence) { next_sequence_ = FullSequence{sequence.group, sequence.object + 1}; } + if (sequence.object >= group.next_object) { + group.next_object = sequence.object + 1; + } + switch (status) { + case MoqtObjectStatus::kEndOfTrack: + end_of_track_ = sequence; + break; + case MoqtObjectStatus::kEndOfGroup: + case MoqtObjectStatus::kGroupDoesNotExist: + group.complete = true; + break; + default: + break; + } std::shared_ptr<quiche::QuicheMemSlice> slice = payload.empty() ? nullptr : std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy( quiche::SimpleBufferAllocator::Get(), payload)); - auto object_it = group_it.first->second.try_emplace(sequence.object, sequence, - status, slice); - if (!object_it.second) { - QUICHE_DLOG(ERROR) << "Sender is overwriting an existing object."; - return false; - } + object_queue.emplace(sequence.object, + CachedObject{sequence, status, priority, slice}); for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(sequence); } @@ -117,10 +123,24 @@ FullSequence sequence) const { auto group_it = queue_.find(sequence.group); if (group_it == queue_.end()) { + // Group does not exist. return std::nullopt; } - auto object_it = group_it->second.find(sequence.object); - if (object_it == group_it->second.end()) { + const Group& group = group_it->second; + auto subgroup_it = group.subgroups.find( + SubgroupPriority{publisher_priority_, sequence.subgroup}); + if (subgroup_it == group.subgroups.end()) { + // Subgroup does not exist. + return std::nullopt; + } + const Subgroup& subgroup = subgroup_it->second; + if (subgroup.empty()) { + return std::nullopt; // There are no objects. + } + // Find an object with ID of at least sequence.object. + auto object_it = subgroup.lower_bound(sequence.object); + if (object_it == subgroup_it->second.end()) { + // No object after the last one received. return std::nullopt; } return CachedObjectToPublishedObject(object_it->second); @@ -131,9 +151,21 @@ std::vector<FullSequence> sequences; SubscribeWindow window(start, end); for (auto& group_it : queue_) { - for (auto& object_it : group_it.second) { - if (window.InWindow(object_it.second.sequence)) { - sequences.push_back(object_it.second.sequence); + if (group_it.first < start.group) { + continue; + } + if (group_it.first > end.group) { + return sequences; + } + for (auto& subgroup_it : group_it.second.subgroups) { + for (auto& object_it : subgroup_it.second) { + if (window.InWindow(object_it.second.sequence)) { + sequences.push_back(object_it.second.sequence); + } + if (group_it.first == end.group && + object_it.second.sequence.object >= end.object) { + break; + } } } }
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h index ae2a58e..171cd8a 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.h +++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -8,7 +8,6 @@ #include <cstddef> #include <cstdint> #include <optional> -#include <tuple> #include <utility> #include <vector> @@ -51,8 +50,13 @@ // occur. A false return value might result in a session error on the // inbound session, but this queue is the only place that retains enough state // to check. - bool AddObject(uint64_t group_id, uint64_t object_id, MoqtObjectStatus status, - absl::string_view object); + bool AddObject(FullSequence sequence, MoqtObjectStatus status) { + return AddRawObject(sequence, status, publisher_priority_, ""); + } + bool AddObject(FullSequence sequence, absl::string_view object) { + return AddRawObject(sequence, MoqtObjectStatus::kNormal, + publisher_priority_, object); + } // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } @@ -85,14 +89,16 @@ static constexpr size_t kMaxQueuedGroups = 3; // Ordered by object id. - using Group = absl::btree_map<uint64_t, CachedObject>; + using Subgroup = absl::btree_map<uint64_t, CachedObject>; - // Returns the next expected object ID in |group|, and also |true| if the last - // object ends the group. - std::tuple<uint64_t, bool> NextObject(Group& group) const; + struct Group { + uint64_t next_object = 0; + bool complete = false; + absl::btree_map<SubgroupPriority, Subgroup> subgroups; + }; bool AddRawObject(FullSequence sequence, MoqtObjectStatus status, - absl::string_view payload); + MoqtPriority priority, absl::string_view payload); FullTrackName track_; MoqtForwardingPreference forwarding_preference_;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc index 3734e48..228cc81 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -48,6 +48,10 @@ case MoqtObjectStatus::kEndOfTrack: CloseTrack(); break; + case moqt::MoqtObjectStatus::kEndOfSubgroup: + CloseStreamForSubgroup(object->sequence.group, + object->sequence.subgroup); + break; default: EXPECT_TRUE(false); } @@ -64,6 +68,8 @@ } MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ()); + MOCK_METHOD(void, CloseStreamForSubgroup, + (uint64_t group_id, uint64_t subgroup_id), ()); MOCK_METHOD(void, PublishObject, (uint64_t group_id, uint64_t object_id, absl::string_view payload), @@ -83,10 +89,11 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseStreamForGroup(0)); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, "")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup)); } TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) { @@ -101,9 +108,9 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); queue.CallSubscribeForPast(SubscribeWindow(0, 0)); } @@ -118,9 +125,9 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); queue.CallSubscribeForPast(SubscribeWindow(0, 1)); } @@ -136,13 +143,14 @@ EXPECT_CALL(queue, PublishObject(1, 1, "e")); EXPECT_CALL(queue, PublishObject(1, 2, "f")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "d")); - EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "e")); - EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kNormal, "f")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "d")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 2}, "f")); } TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) { @@ -164,13 +172,14 @@ EXPECT_CALL(queue, PublishObject(1, 1, "e")); EXPECT_CALL(queue, PublishObject(1, 2, "f")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "d")); - EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "e")); - EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kNormal, "f")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "d")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 2}, "f")); queue.CallSubscribeForPast(SubscribeWindow(0, 1)); } @@ -194,20 +203,24 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d")); - EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e")); - EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f")); - EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g")); - EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h")); - EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i")); - EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d")); + EXPECT_TRUE( + queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f")); + EXPECT_TRUE( + queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g")); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h")); + EXPECT_TRUE( + queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i")); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j")); } TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) { @@ -239,20 +252,24 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d")); - EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e")); - EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f")); - EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g")); - EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h")); - EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i")); - EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d")); + EXPECT_TRUE( + queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f")); + EXPECT_TRUE( + queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g")); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h")); + EXPECT_TRUE( + queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i")); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j")); queue.CallSubscribeForPast(SubscribeWindow(0, 0)); } @@ -274,21 +291,25 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(1, 0, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(1, 1, MoqtObjectStatus::kNormal, "d")); - EXPECT_TRUE(queue.AddObject(1, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(2, 0, MoqtObjectStatus::kNormal, "e")); - EXPECT_TRUE(queue.AddObject(2, 1, MoqtObjectStatus::kNormal, "f")); - EXPECT_TRUE(queue.AddObject(2, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(3, 0, MoqtObjectStatus::kNormal, "g")); - EXPECT_TRUE(queue.AddObject(3, 1, MoqtObjectStatus::kNormal, "h")); - EXPECT_TRUE(queue.AddObject(3, 2, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(4, 0, MoqtObjectStatus::kNormal, "i")); - EXPECT_TRUE(queue.AddObject(4, 1, MoqtObjectStatus::kNormal, "j")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c")); + EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d")); + EXPECT_TRUE( + queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f")); + EXPECT_TRUE( + queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g")); + EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h")); + EXPECT_TRUE( + queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i")); + EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j")); // This object will be ignored, but this is not an error. - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kEndOfGroup, "")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup)); } TEST(MoqtLiveRelayQueue, EndOfTrack) { @@ -299,10 +320,12 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseTrack()); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kEndOfTrack, "")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfTrack, "")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_FALSE( + queue.AddObject(FullSequence{0, 1}, MoqtObjectStatus::kEndOfTrack)); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfTrack)); } TEST(MoqtLiveRelayQueue, EndOfGroup) { @@ -313,11 +336,13 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseStreamForGroup(0)); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_FALSE(queue.AddObject(0, 4, MoqtObjectStatus::kNormal, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_FALSE( + queue.AddObject(FullSequence{0, 1}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_FALSE(queue.AddObject(FullSequence{0, 4}, "e")); } TEST(MoqtLiveRelayQueue, GroupDoesNotExist) { @@ -326,8 +351,10 @@ testing::InSequence seq; EXPECT_CALL(queue, SkipGroup(0)); } - EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kGroupDoesNotExist, "")); - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kGroupDoesNotExist, "")); + EXPECT_FALSE(queue.AddObject(FullSequence{0, 1}, + MoqtObjectStatus::kGroupDoesNotExist)); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, + MoqtObjectStatus::kGroupDoesNotExist)); } TEST(MoqtLiveRelayQueue, OverwriteObject) { @@ -338,11 +365,66 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(0, 0, MoqtObjectStatus::kNormal, "a")); - EXPECT_TRUE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "b")); - EXPECT_TRUE(queue.AddObject(0, 2, MoqtObjectStatus::kNormal, "c")); - EXPECT_TRUE(queue.AddObject(0, 3, MoqtObjectStatus::kEndOfGroup, "")); - EXPECT_FALSE(queue.AddObject(0, 1, MoqtObjectStatus::kNormal, "invalid")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_FALSE(queue.AddObject(FullSequence{0, 1}, "invalid")); +} + +TEST(MoqtLiveRelayQueue, DifferentSubgroups) { + TestMoqtLiveRelayQueue queue; + { + testing::InSequence seq; + EXPECT_CALL(queue, PublishObject(0, 0, "a")); + EXPECT_CALL(queue, PublishObject(0, 1, "b")); + EXPECT_CALL(queue, PublishObject(0, 3, "d")); + EXPECT_CALL(queue, PublishObject(0, 2, "c")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 0)); + EXPECT_CALL(queue, PublishObject(0, 5, "e")); + EXPECT_CALL(queue, PublishObject(0, 7, "f")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 1)); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2)); + + // Serve them back in strict subgroup order. + EXPECT_CALL(queue, PublishObject(0, 0, "a")); + EXPECT_CALL(queue, PublishObject(0, 3, "d")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 0)); + EXPECT_CALL(queue, PublishObject(0, 1, "b")); + EXPECT_CALL(queue, PublishObject(0, 5, "e")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 1)); + EXPECT_CALL(queue, PublishObject(0, 2, "c")); + EXPECT_CALL(queue, PublishObject(0, 7, "f")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2)); + } + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1, 1}, "b")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 3}, "d")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2, 2}, "c")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 0, 4}, MoqtObjectStatus::kEndOfSubgroup)); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 1, 5}, "e")); + EXPECT_TRUE(queue.AddObject(FullSequence{0, 2, 7}, "f")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 1, 6}, MoqtObjectStatus::kEndOfSubgroup)); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 2, 8}, MoqtObjectStatus::kEndOfSubgroup)); + queue.CallSubscribeForPast(SubscribeWindow(0, 0)); +} + +TEST(MoqtLiveRelayQueue, EndOfSubgroup) { + TestMoqtLiveRelayQueue queue; + { + testing::InSequence seq; + EXPECT_CALL(queue, PublishObject(0, 0, "a")); + EXPECT_CALL(queue, CloseStreamForSubgroup(0, 0)); + EXPECT_CALL(queue, PublishObject(0, 2, "b")).Times(0); + } + EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a")); + EXPECT_TRUE( + queue.AddObject(FullSequence{0, 0, 1}, MoqtObjectStatus::kEndOfSubgroup)); + EXPECT_FALSE(queue.AddObject(FullSequence{0, 0, 2}, "b")); } } // namespace
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 81ffb33..83502bc 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -242,6 +242,7 @@ bool operator>(const FullSequence& other) const { return !(*this <= other); } FullSequence& operator=(FullSequence other) { group = other.group; + subgroup = other.subgroup; object = other.object; return *this; } @@ -260,6 +261,21 @@ struct SubgroupPriority { uint8_t publisher_priority = 0xf0; uint64_t subgroup_id = 0; + + bool operator==(const SubgroupPriority& other) const { + return publisher_priority == other.publisher_priority && + subgroup_id == other.subgroup_id; + } + bool operator<(const SubgroupPriority& other) const { + return publisher_priority < other.publisher_priority || + (publisher_priority == other.publisher_priority && + subgroup_id < other.subgroup_id); + } + bool operator<=(const SubgroupPriority& other) const { + return (publisher_priority < other.publisher_priority || + (publisher_priority == other.publisher_priority && + subgroup_id <= other.subgroup_id)); + } }; template <typename H>
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 35d1f9d..0fe36c7 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -46,7 +46,7 @@ quiche::QuicheMemSlice payload) { FullSequence sequence{current_group_id_, queue_.back().size()}; queue_.back().push_back(CachedObject{ - sequence, status, + sequence, status, publisher_priority_, std::make_shared<quiche::QuicheMemSlice>(std::move(payload))}); for (MoqtObjectListener* listener : listeners_) { listener->OnNewObjectAvailable(sequence); @@ -58,7 +58,7 @@ if (sequence.group < first_group_in_queue()) { return PublishedObject{FullSequence{sequence.group, sequence.object}, MoqtObjectStatus::kGroupDoesNotExist, - quiche::QuicheMemSlice()}; + publisher_priority_, quiche::QuicheMemSlice()}; } if (sequence.group > current_group_id_) { return std::nullopt; @@ -71,7 +71,7 @@ } return PublishedObject{FullSequence{sequence.group, sequence.object}, MoqtObjectStatus::kObjectDoesNotExist, - quiche::QuicheMemSlice()}; + publisher_priority_, quiche::QuicheMemSlice()}; } QUICHE_DCHECK(sequence == group[sequence.object].sequence); return CachedObjectToPublishedObject(group[sequence.object]);
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 778236c..cb6c0bd 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -21,6 +21,7 @@ struct PublishedObject { FullSequence sequence; MoqtObjectStatus status; + MoqtPriority publisher_priority; quiche::QuicheMemSlice payload; }; @@ -46,7 +47,8 @@ virtual const FullTrackName& GetTrackName() const = 0; // GetCachedObject lets the MoQT stack access the objects that are available - // in the track's built-in local cache. + // in the track's built-in local cache. Retrieves the first object ID >= + // sequence.object that matches (sequence.group, sequence.subgroup). // // This implementation of MoQT does not store any objects within the MoQT // stack itself, at least until the object is fully serialized and passed to @@ -64,7 +66,7 @@ FullSequence sequence) const = 0; // Returns a full list of objects available in the cache, to be used for - // SUBSCRIBEs with a backfill. + // SUBSCRIBEs with a backfill. Returned in order of worsening priority. virtual std::vector<FullSequence> GetCachedObjectsInRange( FullSequence start, FullSequence end) const = 0;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 656b8d0..6104561 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -200,10 +200,10 @@ << payload.size(); auto [full_track_name, visitor] = TrackPropertiesFromAlias(message); if (visitor != nullptr) { - visitor->OnObjectFragment(full_track_name, message.group_id, - message.object_id, message.publisher_priority, - message.object_status, - message.forwarding_preference, payload, true); + visitor->OnObjectFragment( + full_track_name, FullSequence{message.group_id, 0, message.object_id}, + message.publisher_priority, message.object_status, + message.forwarding_preference, payload, true); } } @@ -917,7 +917,9 @@ auto [full_track_name, visitor] = session_->TrackPropertiesFromAlias(message); if (visitor != nullptr) { visitor->OnObjectFragment( - full_track_name, message.group_id, message.object_id, + full_track_name, + FullSequence{message.group_id, message.subgroup_id.value_or(0), + message.object_id}, message.publisher_priority, message.object_status, message.forwarding_preference, payload, end_of_message); } @@ -1236,7 +1238,7 @@ void MoqtSession::OutgoingDataStream::SendNextObject( PublishedSubscription& subscription, PublishedObject object) { - QUICHE_DCHECK(object.sequence == next_object_); + QUICHE_DCHECK(next_object_ <= object.sequence); QUICHE_DCHECK(stream_->CanWrite()); MoqtTrackPublisher& publisher = subscription.publisher(); @@ -1271,14 +1273,17 @@ ++next_object_.group; next_object_.object = 0; } else { - ++next_object_.object; + next_object_.object = header.object_id + 1; } fin = object.status == MoqtObjectStatus::kEndOfTrack || !subscription.InWindow(next_object_); break; case MoqtForwardingPreference::kSubgroup: - ++next_object_.object; + // TODO(martinduke): EndOfGroup and EndOfTrack implies the ability to + // close other streams/subgroups. PublishedObject should contain a boolean + // if the stream is safe to close. + next_object_.object = header.object_id + 1; fin = object.status == MoqtObjectStatus::kEndOfTrack || object.status == MoqtObjectStatus::kEndOfGroup || object.status == MoqtObjectStatus::kEndOfSubgroup ||
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index e90c3b1..eeb9324 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -306,6 +306,7 @@ } void set_subscriber_priority(MoqtPriority priority); + // This is only called for objects that have just arrived. void OnNewObjectAvailable(FullSequence sequence) override; void ProcessObjectAck(const MoqtObjectAck& message) { if (monitoring_interface_ == nullptr) { @@ -414,6 +415,9 @@ MoqtSession* session_; webtransport::Stream* stream_; uint64_t subscription_id_; + // A FullSequence with the minimum object ID that should go out next. The + // session doesn't know what the next object ID in the stream is because + // the next object could be in a different subgroup or simply be skipped. FullSequence next_object_; bool stream_header_written_ = false; // A weak pointer to an object owned by the session. Used to make sure the
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 106c172..d1f1969 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -725,7 +725,7 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, true); @@ -751,7 +751,7 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, false); @@ -782,7 +782,7 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session, &mock_stream); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(2); + EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(2); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, false); @@ -820,15 +820,14 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)) + .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence, MoqtPriority publisher_priority, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(group_sequence, object.group_id); - EXPECT_EQ(object_sequence, object.object_id); + EXPECT_EQ(sequence.group, object.group_id); + EXPECT_EQ(sequence.object, object.object_id); }); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -879,15 +878,14 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _)) + .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence, MoqtPriority publisher_priority, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(group_sequence, object.group_id); - EXPECT_EQ(object_sequence, object.object_id); + EXPECT_EQ(sequence.group, object.group_id); + EXPECT_EQ(sequence.object, object.object_id); }); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -941,15 +939,14 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _)) + .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence, MoqtPriority publisher_priority, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(group_sequence, object.group_id); - EXPECT_EQ(object_sequence, object.object_id); + EXPECT_EQ(sequence.group, object.group_id); + EXPECT_EQ(sequence.object, object.object_id); }); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -994,15 +991,14 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _, _)) + .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence, MoqtPriority publisher_priority, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(group_sequence, object.group_id); - EXPECT_EQ(object_sequence, object.object_id); + EXPECT_EQ(sequence.group, object.group_id); + EXPECT_EQ(sequence.object, object.object_id); }); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -1068,7 +1064,7 @@ return absl::OkStatus(); }); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] { - return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, + return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 127, MemSliceFromString("deadbeef")}; }); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] { @@ -1114,7 +1110,7 @@ .WillRepeatedly(Return(&mock_stream)); EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus())); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] { - return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, + return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128, MemSliceFromString("deadbeef")}; }); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] { @@ -1152,7 +1148,7 @@ EXPECT_CALL(mock_stream, Writev(_, _)).WillOnce(Return(absl::OkStatus())); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] { - return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, + return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128, MemSliceFromString("deadbeef")}; }); EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillOnce([] { @@ -1299,7 +1295,7 @@ EXPECT_CALL(*track_publisher, GetCachedObject(FullSequence{5, 0})) .WillRepeatedly([] { return PublishedObject{FullSequence{5, 0}, MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")}; + 128, MemSliceFromString("deadbeef")}; }); listener->OnNewObjectAvailable(FullSequence(5, 0)); EXPECT_TRUE(correct_message); @@ -1323,10 +1319,11 @@ }; char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x08, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; - EXPECT_CALL(visitor_, - OnObjectFragment(ftn, object.group_id, object.object_id, - object.publisher_priority, object.object_status, - object.forwarding_preference, payload, true)) + EXPECT_CALL( + visitor_, + OnObjectFragment(ftn, FullSequence{object.group_id, object.object_id}, + object.publisher_priority, object.object_status, + object.forwarding_preference, payload, true)) .Times(1); session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); } @@ -1351,7 +1348,7 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1); EXPECT_CALL(mock_stream, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, true); @@ -1470,19 +1467,19 @@ EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) .WillOnce( Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")})); + 127, MemSliceFromString("deadbeef")})); EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 0))) .WillOnce( Return(PublishedObject{FullSequence(1, 0), MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")})); + 127, MemSliceFromString("deadbeef")})); EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 0))) .WillOnce( Return(PublishedObject{FullSequence(2, 0), MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")})); + 127, MemSliceFromString("deadbeef")})); EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); @@ -1580,7 +1577,7 @@ EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) .WillOnce( Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")})); + 127, MemSliceFromString("deadbeef")})); EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); @@ -1616,7 +1613,7 @@ EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0))) .WillOnce( Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal, - MemSliceFromString("deadbeef")})); + 127, MemSliceFromString("deadbeef")})); EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1))) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 5f3a75c..923fa37 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -38,9 +38,8 @@ virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0; // Called when an object fragment (or an entire object) is received. virtual void OnObjectFragment( - const FullTrackName& full_track_name, uint64_t group_sequence, - uint64_t object_sequence, MoqtPriority publisher_priority, - MoqtObjectStatus object_status, + const FullTrackName& full_track_name, FullSequence sequence, + MoqtPriority publisher_priority, MoqtObjectStatus object_status, MoqtForwardingPreference forwarding_preference, absl::string_view object, bool end_of_message) = 0; // TODO(martinduke): Add final sequence numbers
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index 68208b1..13e3497 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -124,9 +124,8 @@ } void ChatClient::RemoteTrackVisitor::OnObjectFragment( - const FullTrackName& full_track_name, uint64_t group_sequence, - uint64_t object_sequence, MoqtPriority /*publisher_priority*/, - MoqtObjectStatus /*status*/, + const FullTrackName& full_track_name, FullSequence sequence, + MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/, MoqtForwardingPreference /*forwarding_preference*/, absl::string_view object, bool end_of_message) { if (!end_of_message) { @@ -134,11 +133,11 @@ "buffering\n"; } if (full_track_name == client_->chat_strings_->GetCatalogName()) { - if (group_sequence < client_->catalog_group_) { + if (sequence.group < client_->catalog_group_) { std::cout << "Ignoring old catalog"; return; } - client_->ProcessCatalog(object, this, group_sequence, object_sequence); + client_->ProcessCatalog(object, this, sequence.group, sequence.object); return; } std::string username(
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index 0b3ffc7..fe64809 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -98,7 +98,7 @@ void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment(const moqt::FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + FullSequence sequence, moqt::MoqtPriority publisher_priority, moqt::MoqtObjectStatus status, moqt::MoqtForwardingPreference forwarding_preference,
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index 028786e..5b32185 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -92,9 +92,8 @@ } void ChatServer::RemoteTrackVisitor::OnObjectFragment( - const moqt::FullTrackName& full_track_name, uint64_t group_sequence, - uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/, - moqt::MoqtObjectStatus status, + const moqt::FullTrackName& full_track_name, moqt::FullSequence sequence, + moqt::MoqtPriority /*publisher_priority*/, moqt::MoqtObjectStatus status, moqt::MoqtForwardingPreference /*forwarding_preference*/, absl::string_view object, bool end_of_message) { if (!end_of_message) { @@ -114,13 +113,13 @@ return; } if (status != MoqtObjectStatus::kNormal) { - it->second->AddObject(group_sequence, object_sequence, status, ""); + it->second->AddObject(sequence, status); return; } if (!server_->WriteToFile(username, object)) { std::cout << username << ": " << object << "\n\n"; } - it->second->AddObject(group_sequence, object_sequence, status, object); + it->second->AddObject(sequence, object); } ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index 7bb4f13..f1b2ae1 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -42,8 +42,8 @@ std::optional<absl::string_view> reason_phrase) override; void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment( - const moqt::FullTrackName& full_track_name, uint64_t group_sequence, - uint64_t object_sequence, moqt::MoqtPriority /*publisher_priority*/, + const moqt::FullTrackName& full_track_name, FullSequence sequence, + moqt::MoqtPriority /*publisher_priority*/, moqt::MoqtObjectStatus /*status*/, moqt::MoqtForwardingPreference /*forwarding_preference*/, absl::string_view object, bool end_of_message) override;
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index bd0f72f..ddb51e6 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -177,13 +177,13 @@ void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment(const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + FullSequence sequence, MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/, MoqtForwardingPreference /*forwarding_preference*/, absl::string_view object, bool /*end_of_message*/) override { - std::string file_name = absl::StrCat(group_sequence, "-", object_sequence, + std::string file_name = absl::StrCat(sequence.group, "-", sequence.object, ".", full_track_name.tuple().back()); std::string file_path = quiche::JoinPath(directory_, file_name); std::ofstream output(file_path, std::ios::binary | std::ios::ate);
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index 371a0c9..5debaeb 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -80,9 +80,8 @@ MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function), (override)); MOCK_METHOD(void, OnObjectFragment, - (const FullTrackName& full_track_name, uint64_t group_sequence, - uint64_t object_sequence, MoqtPriority publisher_priority, - MoqtObjectStatus status, + (const FullTrackName& full_track_name, FullSequence sequence, + MoqtPriority publisher_priority, MoqtObjectStatus status, MoqtForwardingPreference forwarding_preference, absl::string_view object, bool end_of_message), (override));
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index 0ebb615..2d260ca 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -202,7 +202,7 @@ } void OnObjectFragment(const FullTrackName& full_track_name, - uint64_t group_sequence, uint64_t object_sequence, + FullSequence sequence, MoqtPriority /*publisher_priority*/, MoqtObjectStatus status, MoqtForwardingPreference /*forwarding_preference*/, @@ -218,7 +218,6 @@ // TODO: this logic should be factored out. Also, this should take advantage // of the fact that in the current MoQT, the object size is known in // advance. - FullSequence sequence{group_sequence, object_sequence}; if (!end_of_message) { auto [it, unused] = partial_objects_.try_emplace(sequence); it->second.append(object);