Multiple MoQT refactors, merged into one CL since detangling them would require extra effort. * Location no longer has a subgroup ID in it. * A new type, DataStreamIndex, is introduced as a hashable (group, subgroup) tuple. * A new type, PublishedObjectMetadata, is introduced to pass around object metadata in a way that's consistent between the send and the receive code. * A bunch of other smaller changes. This potentially fixes at least two bugs we currently have in our code: - If a single subgroup were reset within a group, no other subgroup in the group would be able to send objects. - Subgroup IDs were not passed to the application on receipt. PiperOrigin-RevId: 776288849
diff --git a/build/source_list.bzl b/build/source_list.bzl index 0bbd9b2..43911dd 100644 --- a/build/source_list.bzl +++ b/build/source_list.bzl
@@ -1557,7 +1557,6 @@ "quic/moqt/moqt_session_interface.h", "quic/moqt/moqt_subscribe_windows.h", "quic/moqt/moqt_track.h", - "quic/moqt/test_tools/mock_moqt_session.h", "quic/moqt/test_tools/moqt_framer_utils.h", "quic/moqt/test_tools/moqt_parser_test_visitor.h", "quic/moqt/test_tools/moqt_session_peer.h", @@ -1597,8 +1596,6 @@ "quic/moqt/moqt_subscribe_windows_test.cc", "quic/moqt/moqt_track.cc", "quic/moqt/moqt_track_test.cc", - "quic/moqt/test_tools/mock_moqt_session.cc", - "quic/moqt/test_tools/mock_moqt_session_test.cc", "quic/moqt/test_tools/moqt_framer_utils.cc", "quic/moqt/test_tools/moqt_simulator_harness.cc", "quic/moqt/tools/chat_client.cc",
diff --git a/build/source_list.gni b/build/source_list.gni index 54bbd26..b2b99f0 100644 --- a/build/source_list.gni +++ b/build/source_list.gni
@@ -1561,7 +1561,6 @@ "src/quiche/quic/moqt/moqt_session_interface.h", "src/quiche/quic/moqt/moqt_subscribe_windows.h", "src/quiche/quic/moqt/moqt_track.h", - "src/quiche/quic/moqt/test_tools/mock_moqt_session.h", "src/quiche/quic/moqt/test_tools/moqt_framer_utils.h", "src/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h", "src/quiche/quic/moqt/test_tools/moqt_session_peer.h", @@ -1601,8 +1600,6 @@ "src/quiche/quic/moqt/moqt_subscribe_windows_test.cc", "src/quiche/quic/moqt/moqt_track.cc", "src/quiche/quic/moqt/moqt_track_test.cc", - "src/quiche/quic/moqt/test_tools/mock_moqt_session.cc", - "src/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc", "src/quiche/quic/moqt/test_tools/moqt_framer_utils.cc", "src/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc", "src/quiche/quic/moqt/tools/chat_client.cc",
diff --git a/build/source_list.json b/build/source_list.json index 9830675..501ddf7 100644 --- a/build/source_list.json +++ b/build/source_list.json
@@ -1560,7 +1560,6 @@ "quiche/quic/moqt/moqt_session_interface.h", "quiche/quic/moqt/moqt_subscribe_windows.h", "quiche/quic/moqt/moqt_track.h", - "quiche/quic/moqt/test_tools/mock_moqt_session.h", "quiche/quic/moqt/test_tools/moqt_framer_utils.h", "quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h", "quiche/quic/moqt/test_tools/moqt_session_peer.h", @@ -1600,8 +1599,6 @@ "quiche/quic/moqt/moqt_subscribe_windows_test.cc", "quiche/quic/moqt/moqt_track.cc", "quiche/quic/moqt/moqt_track_test.cc", - "quiche/quic/moqt/test_tools/mock_moqt_session.cc", - "quiche/quic/moqt/test_tools/mock_moqt_session_test.cc", "quiche/quic/moqt/test_tools/moqt_framer_utils.cc", "quiche/quic/moqt/test_tools/moqt_simulator_harness.cc", "quiche/quic/moqt/tools/chat_client.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_cached_object.cc index 33294bb..df71f1c 100644 --- a/quiche/quic/moqt/moqt_cached_object.cc +++ b/quiche/quic/moqt/moqt_cached_object.cc
@@ -13,15 +13,12 @@ moqt::PublishedObject CachedObjectToPublishedObject( const CachedObject& object) { PublishedObject result; - result.sequence = object.sequence; - result.status = object.status; - result.publisher_priority = object.publisher_priority; + result.metadata = object.metadata; if (object.payload != nullptr && !object.payload->empty()) { result.payload = quiche::QuicheMemSlice( object.payload->data(), object.payload->length(), [retained_pointer = object.payload](absl::string_view) {}); } - result.arrival_time = object.arrival_time; result.fin_after_this = object.fin_after_this; return result; }
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h index ac33096..ef5566a 100644 --- a/quiche/quic/moqt/moqt_cached_object.h +++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -7,9 +7,6 @@ #include <memory> -#include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/quiche_mem_slice.h" @@ -18,11 +15,8 @@ // CachedObject is a version of PublishedObject with a reference counted // payload. struct CachedObject { - Location sequence; - MoqtObjectStatus status; - MoqtPriority publisher_priority; + PublishedObjectMetadata metadata; std::shared_ptr<quiche::QuicheMemSlice> payload; - quic::QuicTime arrival_time; bool fin_after_this; // This is the last object before FIN. };
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index c1099ee..e01317e 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -19,6 +19,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" +#include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/test_tools/moqt_session_peer.h" #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" @@ -93,6 +94,12 @@ std::unique_ptr<MoqtServerEndpoint> server_; }; +MATCHER_P2( + MetadataLocationAndStatus, location, status, + "Matches a PublishedObjectMetadata against Location and ObjectStatus") { + return arg.location == location && status == arg.status; +} + TEST_F(MoqtIntegrationTest, Handshake) { CreateDefaultEndpoints(); WireUpEndpoints(); @@ -283,15 +290,14 @@ queue->AddObject(MemSliceFromString("object data"), /*key=*/true); bool received_object = false; - EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, Location sequence, - MoqtPriority /*publisher_priority*/, - MoqtObjectStatus status, absl::string_view object, - bool end_of_message) { + EXPECT_CALL(server_visitor, OnObjectFragment) + .WillOnce([&](const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, + absl::string_view object, bool end_of_message) { EXPECT_EQ(full_track_name, FullTrackName("test", "data")); - EXPECT_EQ(sequence.group, 0u); - EXPECT_EQ(sequence.object, 0u); - EXPECT_EQ(status, MoqtObjectStatus::kNormal); + EXPECT_EQ(metadata.location.group, 0u); + EXPECT_EQ(metadata.location.object, 0u); + EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal); EXPECT_EQ(object, "object data"); EXPECT_TRUE(end_of_message); received_object = true; @@ -336,18 +342,25 @@ EXPECT_TRUE(success); int received = 0; - EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{0, 3}, _, - MoqtObjectStatus::kEndOfGroup, "", true)) + EXPECT_CALL( + client_visitor, + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{0, 3}, MoqtObjectStatus::kEndOfGroup), + "", true)) .WillOnce([&] { ++received; }); EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{1, 0}, _, - MoqtObjectStatus::kNormal, "object 4", true)) + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{1, 0}, MoqtObjectStatus::kNormal), + "object 4", true)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 4"), /*key=*/true); EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{1, 1}, _, - MoqtObjectStatus::kNormal, "object 5", true)) + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{1, 1}, MoqtObjectStatus::kNormal), + "object 5", true)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 5"), /*key=*/false); @@ -356,22 +369,31 @@ EXPECT_TRUE(success); EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{1, 2}, _, - MoqtObjectStatus::kNormal, "object 6", true)) + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{1, 2}, MoqtObjectStatus::kNormal), + "object 6", true)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 6"), /*key=*/false); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{1, 3}, _, - MoqtObjectStatus::kEndOfGroup, "", true)) + EXPECT_CALL( + client_visitor, + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{1, 3}, MoqtObjectStatus::kEndOfGroup), + "", true)) .WillOnce([&] { ++received; }); EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{2, 0}, _, - MoqtObjectStatus::kNormal, "object 7", true)) + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{2, 0}, MoqtObjectStatus::kNormal), + "object 7", true)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 7"), /*key=*/true); EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{2, 1}, _, - MoqtObjectStatus::kNormal, "object 8", true)) + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{2, 1}, MoqtObjectStatus::kNormal), + "object 8", true)) .WillOnce([&] { ++received; }); queue->AddObject(MemSliceFromString("object 8"), /*key=*/false); @@ -379,13 +401,19 @@ [&]() { return received >= 7; }); EXPECT_TRUE(success); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{2, 2}, _, - MoqtObjectStatus::kEndOfGroup, "", true)) + EXPECT_CALL( + client_visitor, + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{2, 2}, MoqtObjectStatus::kEndOfGroup), + "", true)) .WillOnce([&] { ++received; }); - EXPECT_CALL(client_visitor, - OnObjectFragment(_, Location{3, 0}, _, - MoqtObjectStatus::kEndOfTrack, "", true)) + EXPECT_CALL( + client_visitor, + OnObjectFragment(_, + MetadataLocationAndStatus( + Location{3, 0}, MoqtObjectStatus::kEndOfTrack), + "", true)) .WillOnce([&] { ++received; }); queue->Close(); success = test_harness_.RunUntilWithDefaultTimeout( @@ -428,13 +456,13 @@ break; } EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess); - EXPECT_EQ(object.sequence, expected); - if (object.sequence.object == 1) { - EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup); + EXPECT_EQ(object.metadata.location, expected); + if (object.metadata.location.object == 1) { + EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup); expected.object = 0; ++expected.group; } else { - EXPECT_EQ(object.status, MoqtObjectStatus::kNormal); + EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal); EXPECT_EQ(object.payload.AsStringView(), "object"); ++expected.object; } @@ -573,9 +601,9 @@ SubscribeLatestObject(full_track_name, &client_visitor); // Deliver 3 objects on 2 streams. - queue->AddObject(Location(0, 0), "object,0,0", false); - queue->AddObject(Location(0, 1), "object,0,1", true); - queue->AddObject(Location(1, 0), "object,1,0", true); + queue->AddObject(Location(0, 0), 0, "object,0,0", false); + queue->AddObject(Location(0, 1), 0, "object,0,1", true); + queue->AddObject(Location(1, 0), 0, "object,1,0", true); int received = 0; EXPECT_CALL(client_visitor, OnObjectFragment).WillRepeatedly([&]() { ++received; @@ -686,18 +714,17 @@ size_t bytes_received = 0; EXPECT_CALL(client_visitor, OnObjectFragment) .WillRepeatedly( - [&](const FullTrackName&, Location sequence, - MoqtPriority /*publisher_priority*/, MoqtObjectStatus status, + [&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) { bytes_received += object.size(); }); - queue->AddObject(Location{0, 0, 0}, data, false); - queue->AddObject(Location{0, 0, 1}, data, false); - queue->AddObject(Location{0, 0, 2}, data, false); - queue->AddObject(Location{0, 0, 3}, data, true); + queue->AddObject(Location{0, 0}, 0, data, false); + queue->AddObject(Location{0, 1}, 0, data, false); + queue->AddObject(Location{0, 2}, 0, data, false); + queue->AddObject(Location{0, 3}, 0, data, true); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return MoqtSessionPeer::SubgroupHasBeenReset( MoqtSessionPeer::GetSubscription(server_->session(), 0), - Location{0, 0, 0}); + DataStreamIndex{0, 0}); }); EXPECT_TRUE(success); // Stream was reset before all the bytes arrived. @@ -736,16 +763,15 @@ size_t bytes_received = 0; EXPECT_CALL(client_visitor, OnObjectFragment) .WillRepeatedly( - [&](const FullTrackName&, Location sequence, - MoqtPriority /*publisher_priority*/, MoqtObjectStatus status, + [&](const FullTrackName&, const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) { bytes_received += object.size(); }); - queue->AddObject(Location{0, 0, 0}, data, false); - queue->AddObject(Location{1, 0, 0}, data, false); + queue->AddObject(Location{0, 0}, 0, data, false); + queue->AddObject(Location{1, 0}, 0, data, false); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return MoqtSessionPeer::SubgroupHasBeenReset( MoqtSessionPeer::GetSubscription(server_->session(), 0), - Location{0, 0, 0}); + DataStreamIndex{0, 0}); }); EXPECT_TRUE(success); EXPECT_EQ(bytes_received, 2000);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc index e6b6f9b..929c531 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -4,6 +4,7 @@ #include "quiche/quic/moqt/moqt_live_relay_queue.h" +#include <cstdint> #include <memory> #include <optional> #include <vector> @@ -18,13 +19,14 @@ #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/simple_buffer_allocator.h" #include "quiche/web_transport/web_transport.h" namespace moqt { -bool MoqtLiveRelayQueue::AddFin(Location sequence) { +bool MoqtLiveRelayQueue::AddFin(Location sequence, uint64_t subgroup) { switch (forwarding_preference_) { case MoqtForwardingPreference::kDatagram: return false; @@ -37,8 +39,8 @@ return false; } Group& group = group_it->second; - auto subgroup_it = group.subgroups.find( - SubgroupPriority{publisher_priority_, sequence.subgroup}); + auto subgroup_it = + group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup}); if (subgroup_it == group.subgroups.end()) { // Subgroup does not exist. return false; @@ -53,13 +55,14 @@ } subgroup_it->second.rbegin()->second.fin_after_this = true; for (MoqtObjectListener* listener : listeners_) { - listener->OnNewFinAvailable(sequence); + listener->OnNewFinAvailable(sequence, subgroup); } return true; } bool MoqtLiveRelayQueue::OnStreamReset( - Location sequence, webtransport::StreamErrorCode error_code) { + Location sequence, uint64_t subgroup_id, + webtransport::StreamErrorCode error_code) { switch (forwarding_preference_) { case MoqtForwardingPreference::kDatagram: return false; @@ -72,23 +75,21 @@ return false; } Group& group = group_it->second; - auto subgroup_it = group.subgroups.find( - SubgroupPriority{publisher_priority_, sequence.subgroup}); + auto subgroup_it = + group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup_id}); if (subgroup_it == group.subgroups.end()) { // Subgroup does not exist. return false; } for (MoqtObjectListener* listener : listeners_) { - listener->OnSubgroupAbandoned(sequence, error_code); + listener->OnSubgroupAbandoned(sequence.group, subgroup_id, error_code); } return true; } -// TODO(martinduke): Unless Track Forwarding preference goes away, support it. -bool MoqtLiveRelayQueue::AddRawObject(Location sequence, - MoqtObjectStatus status, - MoqtPriority priority, - absl::string_view payload, bool fin) { +bool MoqtLiveRelayQueue::AddObject(const PublishedObjectMetadata& metadata, + absl::string_view payload, bool fin) { + const Location& sequence = metadata.location; bool last_object_in_stream = fin; if (queue_.size() == kMaxQueuedGroups) { if (queue_.begin()->first > sequence.group) { @@ -110,7 +111,7 @@ << "track"; return false; } - switch (status) { + switch (metadata.status) { case MoqtObjectStatus::kEndOfTrackAndGroup: if (sequence < next_sequence_) { QUICHE_DLOG(INFO) << "EndOfTrackAndGroup is too early."; @@ -145,16 +146,17 @@ << "group"; return false; } - if ((status == MoqtObjectStatus::kEndOfGroup || - status == MoqtObjectStatus::kEndOfTrackAndGroup) && + if ((metadata.status == MoqtObjectStatus::kEndOfGroup || + metadata.status == MoqtObjectStatus::kEndOfTrackAndGroup) && sequence.object < group.next_object) { QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last " << "object in the group."; return false; } } + // TODO: use `metadata.publisher_priority` instead. auto subgroup_it = group.subgroups.try_emplace( - SubgroupPriority{priority, sequence.subgroup}); + SubgroupPriority{publisher_priority_, metadata.subgroup.value_or(0)}); auto& subgroup = subgroup_it.first->second; if (!subgroup.empty()) { // Check if the new object is valid CachedObject& last_object = subgroup.rbegin()->second; @@ -165,10 +167,11 @@ } // If last_object has stream-ending status, it should have been caught by // the fin_after_this check above. - QUICHE_DCHECK(last_object.status != MoqtObjectStatus::kEndOfTrackAndGroup && - last_object.status != MoqtObjectStatus::kEndOfGroup && - last_object.status != MoqtObjectStatus::kEndOfTrack); - if (last_object.sequence.object >= sequence.object) { + QUICHE_DCHECK( + last_object.metadata.status != MoqtObjectStatus::kEndOfTrackAndGroup && + last_object.metadata.status != MoqtObjectStatus::kEndOfGroup && + last_object.metadata.status != MoqtObjectStatus::kEndOfTrack); + if (last_object.metadata.location.object >= sequence.object) { QUICHE_DLOG(INFO) << "Skipping object because it does not increase the " << "object ID monotonically in the subgroup."; return false; @@ -182,7 +185,7 @@ group.next_object = sequence.object + 1; } // Anticipate stream FIN with most non-normal objects. - switch (status) { + switch (metadata.status) { case MoqtObjectStatus::kEndOfTrack: case MoqtObjectStatus::kEndOfTrackAndGroup: end_of_track_ = sequence; @@ -201,26 +204,24 @@ ? nullptr : std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy( quiche::SimpleBufferAllocator::Get(), payload)); - subgroup.emplace( - sequence.object, - CachedObject{sequence, status, priority, slice, clock_->ApproximateNow(), - last_object_in_stream}); + subgroup.emplace(sequence.object, + CachedObject{metadata, slice, last_object_in_stream}); for (MoqtObjectListener* listener : listeners_) { - listener->OnNewObjectAvailable(sequence); + listener->OnNewObjectAvailable(sequence, metadata.subgroup.value_or(0)); } return true; } std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject( - Location sequence) const { - auto group_it = queue_.find(sequence.group); + uint64_t group_id, uint64_t subgroup_id, uint64_t object_id) const { + auto group_it = queue_.find(group_id); if (group_it == queue_.end()) { // Group does not exist. return std::nullopt; } const Group& group = group_it->second; - auto subgroup_it = group.subgroups.find( - SubgroupPriority{publisher_priority_, sequence.subgroup}); + auto subgroup_it = + group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup_id}); if (subgroup_it == group.subgroups.end()) { // Subgroup does not exist. return std::nullopt; @@ -230,7 +231,7 @@ return std::nullopt; // There are no objects. } // Find an object with ID of at least sequence.object. - auto object_it = subgroup.lower_bound(sequence.object); + auto object_it = subgroup.lower_bound(object_id); if (object_it == subgroup.end()) { // No object after the last one received. return std::nullopt; @@ -238,30 +239,15 @@ return CachedObjectToPublishedObject(object_it->second); } -std::vector<Location> MoqtLiveRelayQueue::GetCachedObjectsInRange( - Location start, Location end) const { - std::vector<Location> sequences; - SubscribeWindow window(start, end.group, end.object); +void MoqtLiveRelayQueue::ForAllObjects( + quiche::UnretainedCallback<void(const CachedObject&)> callback) { for (auto& group_it : queue_) { - if (group_it.first < start.group) { - continue; - } - if (group_it.first > end.group) { - return sequences; - } for (auto& subgroup_it : group_it.second.subgroups) { for (auto& object_it : subgroup_it.second) { - if (window.InWindow(object_it.second.sequence)) { - sequences.push_back(object_it.second.sequence); - } - if (group_it.first == end.group && - object_it.second.sequence.object >= end.object) { - break; - } + callback(object_it.second); } } } - return sequences; } absl::StatusOr<MoqtTrackStatusCode> MoqtLiveRelayQueue::GetTrackStatus() const {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h index 3ac41dd..60ef828 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue.h +++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -10,7 +10,6 @@ #include <memory> #include <optional> #include <utility> -#include <vector> #include "absl/container/btree_map.h" #include "absl/container/flat_hash_set.h" @@ -24,6 +23,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/common/quiche_callbacks.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -58,30 +58,48 @@ // occur. A false return value might result in a session error on the // inbound session, but this queue is the only place that retains enough state // to check. - bool AddObject(Location sequence, MoqtObjectStatus status, bool fin = false) { - return AddRawObject(sequence, status, publisher_priority_, "", fin); - } - bool AddObject(Location sequence, absl::string_view object, + bool AddObject(const PublishedObjectMetadata& metadata, + absl::string_view payload, bool fin); + + // Convenience methods primarily for use in tests. Prefer the + // `PublishedObjectMetadata` version in real forwarding code to ensure all + // metadata is copied correctly. + bool AddObject(Location location, uint64_t subgroup, MoqtObjectStatus status, bool fin = false) { - return AddRawObject(sequence, MoqtObjectStatus::kNormal, - publisher_priority_, object, fin); + PublishedObjectMetadata metadata; + metadata.location = location; + metadata.subgroup = subgroup; + metadata.status = status; + metadata.publisher_priority = 0; + return AddObject(metadata, "", fin); } + bool AddObject(Location location, uint64_t subgroup, absl::string_view object, + bool fin = false) { + PublishedObjectMetadata metadata; + metadata.location = location; + metadata.subgroup = subgroup; + metadata.status = MoqtObjectStatus::kNormal; + metadata.publisher_priority = 0; + return AddObject(metadata, object, fin); + } + // Record a received FIN that did not come with the last object. // If the forwarding preference is kDatagram or kTrack, |sequence| is ignored. // Otherwise, |sequence| is used to determine which stream is being FINed. If // the object ID does not match the last object ID in the stream, no action // is taken. - bool AddFin(Location sequence); + bool AddFin(Location sequence, uint64_t subgroup_id); // Record a received RESET_STREAM. |sequence| encodes the group and subgroup // of the stream that is being reset. Returns false on datagram tracks, or if // the stream does not exist. - bool OnStreamReset(Location sequence, + bool OnStreamReset(Location sequence, uint64_t subgroup_id, webtransport::StreamErrorCode error_code); // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( - Location sequence) const override; + uint64_t group_id, uint64_t subgroup_id, + uint64_t min_object) const override; void AddObjectListener(MoqtObjectListener* listener) override { listeners_.insert(listener); listener->OnSubscribeAccepted(); @@ -119,8 +137,8 @@ } } - std::vector<Location> GetCachedObjectsInRange(Location start, - Location end) const; + void ForAllObjects( + quiche::UnretainedCallback<void(const CachedObject&)> callback); private: // The number of recent groups to keep around for newly joined subscribers. @@ -135,9 +153,6 @@ absl::btree_map<SubgroupPriority, Subgroup> subgroups; }; - bool AddRawObject(Location sequence, MoqtObjectStatus status, - MoqtPriority priority, absl::string_view payload, bool fin); - const quic::QuicClock* clock_; FullTrackName track_; MoqtForwardingPreference forwarding_preference_;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc index 6115acb..f40fe10 100644 --- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc +++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -6,9 +6,9 @@ #include <cstdint> #include <optional> -#include <vector> #include "absl/strings/string_view.h" +#include "quiche/quic/moqt/moqt_cached_object.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" @@ -29,54 +29,58 @@ AddObjectListener(this); } - void OnNewObjectAvailable(Location sequence) { - std::optional<PublishedObject> object = GetCachedObject(sequence); + void OnNewObjectAvailable(Location sequence, uint64_t subgroup_id) { + std::optional<PublishedObject> object = + GetCachedObject(sequence.group, subgroup_id, sequence.object); QUICHE_CHECK(object.has_value()); if (!object.has_value()) { return; } - switch (object->status) { + switch (object->metadata.status) { case MoqtObjectStatus::kNormal: - PublishObject(object->sequence.group, object->sequence.object, + PublishObject(object->metadata.location.group, + object->metadata.location.object, object->payload.AsStringView()); break; case MoqtObjectStatus::kObjectDoesNotExist: - SkipObject(object->sequence.group, object->sequence.object); + SkipObject(object->metadata.location.group, + object->metadata.location.object); break; case MoqtObjectStatus::kGroupDoesNotExist: - SkipGroup(object->sequence.group); + SkipGroup(object->metadata.location.group); break; case MoqtObjectStatus::kEndOfGroup: - CloseStreamForGroup(object->sequence.group); + CloseStreamForGroup(object->metadata.location.group); break; case MoqtObjectStatus::kEndOfTrack: CloseTrack(); break; case MoqtObjectStatus::kEndOfTrackAndGroup: - CloseStreamForGroup(object->sequence.group); + CloseStreamForGroup(object->metadata.location.group); CloseTrack(); break; default: EXPECT_TRUE(false); } if (object->fin_after_this) { - CloseStreamForSubgroup(object->sequence.group, object->sequence.subgroup); + CloseStreamForSubgroup(object->metadata.location.group, + object->metadata.subgroup.value_or(0)); } } void GetObjectsFromPast(const SubscribeWindow& window) { - std::vector<Location> objects = - GetCachedObjectsInRange(Location(0, 0), GetLargestLocation()); - for (Location object : objects) { - if (window.InWindow(object)) { - OnNewObjectAvailable(object); + ForAllObjects([&](const CachedObject& object) { + if (window.InWindow(object.metadata.location)) { + OnNewObjectAvailable(object.metadata.location, + object.metadata.subgroup.value_or(0)); } - } + }); } - MOCK_METHOD(void, OnNewFinAvailable, (Location sequence)); + MOCK_METHOD(void, OnNewFinAvailable, (Location sequence, uint64_t subgroup)); MOCK_METHOD(void, OnSubgroupAbandoned, - (Location sequence, webtransport::StreamErrorCode error_code)); + (uint64_t group, uint64_t subgroup, + webtransport::StreamErrorCode error_code)); MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id)); MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ()); MOCK_METHOD(void, CloseStreamForSubgroup, @@ -106,10 +110,11 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseStreamForGroup(0)); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_TRUE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup)); } TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) { @@ -124,9 +129,9 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); queue.GetObjectsFromPast(SubscribeWindow()); } @@ -141,9 +146,9 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); } @@ -159,13 +164,14 @@ EXPECT_CALL(queue, PublishObject(1, 1, "e")); EXPECT_CALL(queue, PublishObject(1, 2, "f")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d")); - EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e")); - EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_TRUE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d")); + EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e")); + EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f")); } TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) { @@ -187,13 +193,14 @@ EXPECT_CALL(queue, PublishObject(1, 1, "e")); EXPECT_CALL(queue, PublishObject(1, 2, "f")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d")); - EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e")); - EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_TRUE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d")); + EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e")); + EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f")); queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1))); } @@ -219,20 +226,24 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c")); - EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d")); - EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e")); - EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f")); - EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g")); - EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h")); - EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i")); - EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE( + queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c")); + EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d")); + EXPECT_TRUE( + queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e")); + EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f")); + EXPECT_TRUE( + queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g")); + EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h")); + EXPECT_TRUE( + queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i")); + EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j")); } TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) { @@ -266,20 +277,24 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c")); - EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d")); - EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e")); - EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f")); - EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g")); - EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h")); - EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i")); - EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE( + queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c")); + EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d")); + EXPECT_TRUE( + queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e")); + EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f")); + EXPECT_TRUE( + queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g")); + EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h")); + EXPECT_TRUE( + queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i")); + EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j")); queue.GetObjectsFromPast(SubscribeWindow()); } @@ -303,21 +318,25 @@ EXPECT_CALL(queue, PublishObject(4, 0, "i")); EXPECT_CALL(queue, PublishObject(4, 1, "j")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c")); - EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d")); - EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e")); - EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f")); - EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g")); - EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h")); - EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i")); - EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c")); + EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d")); + EXPECT_TRUE( + queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e")); + EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f")); + EXPECT_TRUE( + queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g")); + EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h")); + EXPECT_TRUE( + queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i")); + EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j")); // This object will be ignored, but this is not an error. - EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE( + queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup)); } TEST(MoqtLiveRelayQueue, EndOfTrackAndGroup) { @@ -328,12 +347,12 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseTrack()); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_FALSE( - queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfTrackAndGroup)); - EXPECT_TRUE( - queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrackAndGroup)); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_FALSE(queue.AddObject(Location{0, 1}, 0, + MoqtObjectStatus::kEndOfTrackAndGroup)); + EXPECT_TRUE(queue.AddObject(Location{0, 3}, 0, + MoqtObjectStatus::kEndOfTrackAndGroup)); } TEST(MoqtLiveRelayQueue, EndOfTrack) { @@ -344,10 +363,12 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseTrack()); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_FALSE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrack)); - EXPECT_TRUE(queue.AddObject(Location{1, 0}, MoqtObjectStatus::kEndOfTrack)); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_FALSE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfTrack)); + EXPECT_TRUE( + queue.AddObject(Location{1, 0}, 0, MoqtObjectStatus::kEndOfTrack)); } TEST(MoqtLiveRelayQueue, EndOfGroup) { @@ -358,11 +379,13 @@ EXPECT_CALL(queue, PublishObject(0, 2, "c")); EXPECT_CALL(queue, CloseStreamForGroup(0)); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_FALSE(queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_FALSE(queue.AddObject(Location{0, 4}, "e")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_FALSE( + queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_TRUE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_FALSE(queue.AddObject(Location{0, 4}, 0, "e")); } TEST(MoqtLiveRelayQueue, GroupDoesNotExist) { @@ -372,9 +395,9 @@ EXPECT_CALL(queue, SkipGroup(0)); } EXPECT_FALSE( - queue.AddObject(Location{0, 1}, MoqtObjectStatus::kGroupDoesNotExist)); + queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kGroupDoesNotExist)); EXPECT_TRUE( - queue.AddObject(Location{0, 0}, MoqtObjectStatus::kGroupDoesNotExist)); + queue.AddObject(Location{0, 0}, 0, MoqtObjectStatus::kGroupDoesNotExist)); } TEST(MoqtLiveRelayQueue, OverwriteObject) { @@ -385,11 +408,12 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c")); - EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup)); - EXPECT_FALSE(queue.AddObject(Location{0, 1}, "invalid")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c")); + EXPECT_TRUE( + queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup)); + EXPECT_FALSE(queue.AddObject(Location{0, 1}, 0, "invalid")); } TEST(MoqtLiveRelayQueue, DifferentSubgroups) { @@ -400,11 +424,11 @@ EXPECT_CALL(queue, PublishObject(0, 1, "b")); EXPECT_CALL(queue, PublishObject(0, 3, "d")); EXPECT_CALL(queue, PublishObject(0, 2, "c")); - EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 3})); + EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 3}, 0)); EXPECT_CALL(queue, PublishObject(0, 5, "e")); EXPECT_CALL(queue, PublishObject(0, 7, "f")); - EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 1, 5})); - EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 2, 7})); + EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 5}, 1)); + EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 7}, 2)); // Serve them back in strict subgroup order. EXPECT_CALL(queue, PublishObject(0, 0, "a")); @@ -417,15 +441,15 @@ EXPECT_CALL(queue, PublishObject(0, 7, "f")); EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2)); } - EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a")); - EXPECT_TRUE(queue.AddObject(Location{0, 1, 1}, "b")); - EXPECT_TRUE(queue.AddObject(Location{0, 0, 3}, "d")); - EXPECT_TRUE(queue.AddObject(Location{0, 2, 2}, "c")); - EXPECT_TRUE(queue.AddFin(Location{0, 0, 3})); - EXPECT_TRUE(queue.AddObject(Location{0, 1, 5}, "e")); - EXPECT_TRUE(queue.AddObject(Location{0, 2, 7}, "f")); - EXPECT_TRUE(queue.AddFin(Location{0, 1, 5})); - EXPECT_TRUE(queue.AddFin(Location{0, 2, 7})); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddObject(Location{0, 1}, 1, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 3}, 0, "d")); + EXPECT_TRUE(queue.AddObject(Location{0, 2}, 2, "c")); + EXPECT_TRUE(queue.AddFin(Location{0, 3}, 0)); + EXPECT_TRUE(queue.AddObject(Location{0, 5}, 1, "e")); + EXPECT_TRUE(queue.AddObject(Location{0, 7}, 2, "f")); + EXPECT_TRUE(queue.AddFin(Location{0, 5}, 1)); + EXPECT_TRUE(queue.AddFin(Location{0, 7}, 2)); queue.GetObjectsFromPast(SubscribeWindow()); } @@ -434,12 +458,12 @@ { testing::InSequence seq; EXPECT_CALL(queue, PublishObject(0, 0, "a")); - EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 0})); + EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0)); EXPECT_CALL(queue, PublishObject(0, 2, "b")).Times(0); } - EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a")); - EXPECT_TRUE(queue.AddFin(Location{0, 0, 0})); - EXPECT_FALSE(queue.AddObject(Location{0, 0, 2}, "b")); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0)); + EXPECT_FALSE(queue.AddObject(Location{0, 2}, 0, "b")); } TEST(MoqtLiveRelayQueue, AddObjectWithFin) { @@ -448,10 +472,10 @@ testing::InSequence seq; EXPECT_CALL(queue, PublishObject(0, 0, "a")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", true)); - std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0}); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", true)); + std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0); ASSERT_TRUE(object.has_value()); - EXPECT_EQ(object->status, MoqtObjectStatus::kNormal); + EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal); EXPECT_TRUE(object->fin_after_this); } @@ -461,12 +485,12 @@ testing::InSequence seq; EXPECT_CALL(queue, PublishObject(0, 0, "a")); } - EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", false)); - EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0})); - EXPECT_TRUE(queue.AddFin(Location{0, 0})); - std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0}); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", false)); + EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0)); + EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0)); + std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0); ASSERT_TRUE(object.has_value()); - EXPECT_EQ(object->status, MoqtObjectStatus::kNormal); + EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal); EXPECT_TRUE(object->fin_after_this); } @@ -475,10 +499,10 @@ { testing::InSequence seq; EXPECT_CALL(queue, PublishObject(0, 0, "a")); - EXPECT_CALL(queue, OnSubgroupAbandoned(Location{0, 0}, 0x1)); + EXPECT_CALL(queue, OnSubgroupAbandoned(0, 0, 0x1)); } - EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a")); - EXPECT_TRUE(queue.OnStreamReset(Location{0, 0}, 0x1)); + EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a")); + EXPECT_TRUE(queue.OnStreamReset(Location{0, 0}, 0, 0x1)); } } // namespace
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index d51237a..f5ec8c2 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -12,6 +12,7 @@ #include <initializer_list> #include <optional> #include <string> +#include <tuple> #include <utility> #include <vector> @@ -366,21 +367,21 @@ std::string name_ = ""; }; -// These are absolute sequence numbers. +// Location as defined in +// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure struct Location { - uint64_t group; - uint64_t subgroup; - uint64_t object; - Location() : Location(0, 0) {} - // There is a lot of code from before subgroups. Assume there's one subgroup - // with ID 0 per group. - Location(uint64_t group, uint64_t object) : Location(group, 0, object) {} - Location(uint64_t group, uint64_t subgroup, uint64_t object) - : group(group), subgroup(subgroup), object(object) {} + uint64_t group = 0; + uint64_t object = 0; + + Location() = default; + Location(uint64_t group, uint64_t object) : group(group), object(object) {} + bool operator==(const Location& other) const { return group == other.group && object == other.object; } - // These are temporal ordering comparisons, so subgroup ID doesn't matter. + + // Location order as described in + // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure bool operator<(const Location& other) const { return group < other.group || (group == other.group && object < other.object); @@ -390,13 +391,10 @@ (group == other.group && object <= other.object)); } bool operator>(const Location& other) const { return !(*this <= other); } - Location& operator=(Location other) { - group = other.group; - subgroup = other.subgroup; - object = other.object; - return *this; - } - Location next() const { return Location{group, subgroup, object + 1}; } + bool operator>=(const Location& other) const { return !(*this < other); } + + Location next() const { return Location(group, object + 1); } + template <typename H> friend H AbslHashValue(H h, const Location& m); @@ -730,7 +728,7 @@ // and ranges. The session will populate them instead. std::optional<JoiningFetch> joining_fetch; FullTrackName full_track_name; - Location start_object; // subgroup is ignored + Location start_object; uint64_t end_group; std::optional<uint64_t> end_object; VersionSpecificParameters parameters; @@ -743,7 +741,7 @@ struct QUICHE_EXPORT MoqtFetchOk { uint64_t subscribe_id; MoqtDeliveryOrder group_order; - Location largest_id; // subgroup is ignored + Location largest_id; VersionSpecificParameters parameters; };
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 0428f7e..35221dd 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -65,33 +65,37 @@ Location sequence{current_group_id_, queue_.back().size()}; bool fin = forwarding_preference_ == MoqtForwardingPreference::kSubgroup && status == MoqtObjectStatus::kEndOfGroup; - queue_.back().push_back( - CachedObject{sequence, status, publisher_priority_, - std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), - clock_->ApproximateNow(), fin}); + queue_.back().push_back(CachedObject{ + PublishedObjectMetadata{sequence, 0, status, publisher_priority_, + clock_->ApproximateNow()}, + std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin}); for (MoqtObjectListener* listener : listeners_) { - listener->OnNewObjectAvailable(sequence); + listener->OnNewObjectAvailable(sequence, /*subgroup=*/0); } } std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject( - Location sequence) const { - if (sequence.group < first_group_in_queue()) { - return PublishedObject{Location{sequence.group, sequence.object}, - MoqtObjectStatus::kGroupDoesNotExist, - publisher_priority_, quiche::QuicheMemSlice(), - clock_->ApproximateNow()}; + uint64_t group, uint64_t subgroup, uint64_t object) const { + QUICHE_DCHECK_EQ(subgroup, 0u); + if (group < first_group_in_queue()) { + return PublishedObject{ + PublishedObjectMetadata{Location(group, object), + /*subgroup=*/0, + MoqtObjectStatus::kGroupDoesNotExist, + publisher_priority_, clock_->ApproximateNow()}, + quiche::QuicheMemSlice{}}; } - if (sequence.group > current_group_id_) { + if (group > current_group_id_) { return std::nullopt; } - const std::vector<CachedObject>& group = - queue_[sequence.group - first_group_in_queue()]; - if (sequence.object >= group.size()) { + const std::vector<CachedObject>& group_objects = + queue_[group - first_group_in_queue()]; + if (object >= group_objects.size()) { return std::nullopt; } - QUICHE_DCHECK(sequence == group[sequence.object].sequence); - return CachedObjectToPublishedObject(group[sequence.object]); + QUICHE_DCHECK(Location(group, object) == + group_objects[object].metadata.location); + return CachedObjectToPublishedObject(group_objects[object]); } std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange( @@ -100,8 +104,8 @@ SubscribeWindow window(start, end.group, end.object); for (const Group& group : queue_) { for (const CachedObject& object : group) { - if (window.InWindow(object.sequence)) { - sequences.push_back(object.sequence); + if (window.InWindow(object.metadata.location)) { + sequences.push_back(object.metadata.location); } } } @@ -175,8 +179,8 @@ MoqtFetchTask::GetNextObjectResult result = GetNextObjectInner(object); bool missing_object = result == kSuccess && - (object.status == MoqtObjectStatus::kObjectDoesNotExist || - object.status == MoqtObjectStatus::kGroupDoesNotExist); + (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist || + object.metadata.status == MoqtObjectStatus::kGroupDoesNotExist); if (!missing_object) { return result; } @@ -192,8 +196,8 @@ return kEof; } - std::optional<PublishedObject> result = - queue_->GetCachedObject(objects_.front()); + std::optional<PublishedObject> result = queue_->GetCachedObject( + objects_.front().group, 0, objects_.front().object); if (!result.has_value()) { status_ = absl::InternalError("Previously known object became unknown."); return kError;
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index bcb7f5b..f9a1361 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -55,7 +55,7 @@ // MoqtTrackPublisher implementation. const FullTrackName& GetTrackName() const override { return track_; } std::optional<PublishedObject> GetCachedObject( - Location sequence) const override; + uint64_t group, uint64_t subgroup, uint64_t min_object) const override; void AddObjectListener(MoqtObjectListener* listener) override { listeners_.insert(listener); listener->OnSubscribeAccepted();
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index a648a42..46e0d19 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -46,16 +46,18 @@ AddObjectListener(this); } - void OnNewObjectAvailable(Location sequence) override { - std::optional<PublishedObject> object = GetCachedObject(sequence); + void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override { + std::optional<PublishedObject> object = + GetCachedObject(sequence.group, subgroup, sequence.object); QUICHE_CHECK(object.has_value()); - ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal, - MoqtObjectStatus::kEndOfGroup)); - if (object->status == MoqtObjectStatus::kNormal) { - PublishObject(object->sequence.group, object->sequence.object, + ASSERT_THAT(object->metadata.status, AnyOf(MoqtObjectStatus::kNormal, + MoqtObjectStatus::kEndOfGroup)); + if (object->metadata.status == MoqtObjectStatus::kNormal) { + PublishObject(object->metadata.location.group, + object->metadata.location.object, object->payload.AsStringView()); } else { - CloseStreamForGroup(object->sequence.group); + CloseStreamForGroup(object->metadata.location.group); } } @@ -64,14 +66,15 @@ GetCachedObjectsInRange(Location(0, 0), GetLargestLocation()); for (Location object : objects) { if (window.InWindow(object)) { - OnNewObjectAvailable(object); + OnNewObjectAvailable(object, 0); } } } - MOCK_METHOD(void, OnNewFinAvailable, (Location sequence)); + MOCK_METHOD(void, OnNewFinAvailable, (Location sequence, uint64_t subgroup)); MOCK_METHOD(void, OnSubgroupAbandoned, - (Location sequence, webtransport::StreamErrorCode error_code)); + (uint64_t group, uint64_t subgroup, + webtransport::StreamErrorCode error_code)); MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id)); MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ()); MOCK_METHOD(void, PublishObject, @@ -94,10 +97,10 @@ MoqtFetchTask::GetNextObjectResult result = fetch->GetNextObject(object); switch (result) { case MoqtFetchTask::kSuccess: - if (object.status == MoqtObjectStatus::kNormal) { + if (object.metadata.status == MoqtObjectStatus::kNormal) { objects.emplace_back(object.payload.AsStringView()); } else { - EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup); + EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup); } continue; case MoqtFetchTask::kPending: @@ -364,9 +367,9 @@ quic::QuicTime test_start = clock->ApproximateNow(); TestMoqtOutgoingQueue queue; queue.AddObject(MemSliceFromString("a"), true); - std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0}); + std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0); ASSERT_TRUE(object.has_value()); - EXPECT_GE(object->arrival_time, test_start); + EXPECT_GE(object->metadata.arrival_time, test_start); } } // namespace
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 260beaa..21a9f77 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -9,7 +9,6 @@ #include <memory> #include <optional> #include <variant> -#include <vector> #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -22,14 +21,19 @@ namespace moqt { +struct PublishedObjectMetadata { + Location location; + std::optional<uint64_t> subgroup; // nullopt for datagrams + MoqtObjectStatus status; + MoqtPriority publisher_priority; + quic::QuicTime arrival_time = quic::QuicTime::Zero(); +}; + // PublishedObject is a description of an object that is sufficient to publish // it on a given track. struct PublishedObject { - Location sequence; - MoqtObjectStatus status; - MoqtPriority publisher_priority; + PublishedObjectMetadata metadata; quiche::QuicheMemSlice payload; - quic::QuicTime arrival_time = quic::QuicTime::Zero(); bool fin_after_this = false; }; @@ -49,18 +53,20 @@ MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias = std::nullopt) = 0; - // Notifies that an object with the given sequence number has become - // available. The object payload itself may be retrieved via GetCachedObject - // method of the associated track publisher. - virtual void OnNewObjectAvailable(Location sequence) = 0; + // Notifies that a new object is available on the track. The object payload + // itself may be retrieved via GetCachedObject method of the associated track + // publisher. + virtual void OnNewObjectAvailable(Location sequence, uint64_t subgroup) = 0; // Notifies that a pure FIN has arrived following |sequence|. Should not be // called unless all objects have already been delivered. If not delivered, // instead set the fin_after_this flag in the PublishedObject. - virtual void OnNewFinAvailable(Location sequence) = 0; + virtual void OnNewFinAvailable(Location final_object_in_subgroup, + uint64_t subgroup_id) = 0; // Notifies that the a stream is being abandoned (via RESET_STREAM) before // all objects are delivered. virtual void OnSubgroupAbandoned( - Location sequence, webtransport::StreamErrorCode error_code) = 0; + uint64_t group, uint64_t subgroup, + webtransport::StreamErrorCode error_code) = 0; // No further object will be published for the given group, usually due to a // timeout. The owner of the Listener may want to reset the relevant streams. @@ -130,7 +136,7 @@ // GetCachedObject lets the MoQT stack access the objects that are available // in the track's built-in local cache. Retrieves the first object ID >= - // sequence.object that matches (sequence.group, sequence.subgroup). + // min_object that matches (sequence.group, sequence.subgroup). // // This implementation of MoQT does not store any objects within the MoQT // stack itself, at least until the object is fully serialized and passed to @@ -145,13 +151,7 @@ // otherwise, the corresponding QUIC streams will be stuck waiting for objects // that will never arrive. virtual std::optional<PublishedObject> GetCachedObject( - Location sequence) const = 0; - - // TODO: add an API to fetch past objects that are out of cache and might - // require an upstream request to fill the relevant cache again. This is - // currently done since the specification does not clearly describe how this - // is supposed to be done, especially with respect to such things as - // backpressure. + uint64_t group, uint64_t subgroup, uint64_t min_object) const = 0; // Registers a listener with the track. The listener will be notified of all // newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 715413b..2167463 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -230,10 +230,14 @@ SubscribeRemoteTrack::Visitor* visitor = track->visitor(); if (visitor != nullptr) { // TODO(martinduke): Handle extension headers. - visitor->OnObjectFragment(track->full_track_name(), - Location{message.group_id, 0, message.object_id}, - message.publisher_priority, message.object_status, - *payload, true); + PublishedObjectMetadata metadata; + metadata.location = Location(message.group_id, message.object_id); + metadata.subgroup = std::nullopt; + metadata.status = message.object_status; + metadata.publisher_priority = message.publisher_priority; + metadata.arrival_time = callbacks_.clock->Now(); + visitor->OnObjectFragment(track->full_track_name(), metadata, *payload, + true); } } @@ -592,7 +596,7 @@ switch (result) { case MoqtFetchTask::GetNextObjectResult::kSuccess: // Skip ObjectDoesNotExist in FETCH. - if (object.status == MoqtObjectStatus::kObjectDoesNotExist) { + if (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist) { QUIC_BUG(quic_bug_got_doesnotexist_in_fetch) << "Got ObjectDoesNotExist in FETCH"; continue; @@ -734,7 +738,7 @@ } webtransport::Stream* MoqtSession::OpenOrQueueDataStream( - uint64_t subscription_id, Location first_object) { + uint64_t subscription_id, const NewStreamParameters& parameters) { auto it = published_subscriptions_.find(subscription_id); if (it == published_subscriptions_.end()) { // It is possible that the subscription has been discarded while the stream @@ -743,17 +747,18 @@ } PublishedSubscription& subscription = *it->second; if (!session_->CanOpenNextOutgoingUnidirectionalStream()) { - subscription.AddQueuedOutgoingDataStream(first_object); + subscription.AddQueuedOutgoingDataStream(parameters); // The subscription will notify the session about how to update the // session's queue. // TODO: limit the number of streams in the queue. return nullptr; } - return OpenDataStream(subscription, first_object); + return OpenDataStream(subscription, parameters); } webtransport::Stream* MoqtSession::OpenDataStream( - PublishedSubscription& subscription, Location first_object) { + PublishedSubscription& subscription, + const NewStreamParameters& parameters) { webtransport::Stream* new_stream = session_->OpenOutgoingUnidirectionalStream(); if (new_stream == nullptr) { @@ -762,8 +767,8 @@ return nullptr; } new_stream->SetVisitor(std::make_unique<OutgoingDataStream>( - this, new_stream, subscription, first_object)); - subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object); + this, new_stream, subscription, parameters)); + subscription.OnDataStreamCreated(new_stream->GetStreamId(), parameters.index); return new_stream; } @@ -833,10 +838,11 @@ } // Pop the item from the subscription's queue, which might update // subscribes_with_queued_outgoing_data_streams_. - Location next_queued_stream = + NewStreamParameters next_queued_stream = subscription->second->NextQueuedOutgoingDataStream(); // Check if Group is too old. - if (next_queued_stream.group < subscription->second->first_active_group()) { + if (next_queued_stream.index.group < + subscription->second->first_active_group()) { // The stream is too old to be sent. continue; } @@ -1381,8 +1387,7 @@ start_object = Location(0, 0); } else { start_object = Location( - fetch_end.group - message.joining_fetch->preceding_group_offset, 0, - 0); + fetch_end.group - message.joining_fetch->preceding_group_offset, 0); } end_group = fetch_end.group; end_object = fetch_end.object - 1; @@ -1611,12 +1616,14 @@ subscribe->OnObject(/*is_datagram=*/false); if (subscribe->visitor() != nullptr) { // TODO(martinduke): Send extension headers. - subscribe->visitor()->OnObjectFragment( - track->full_track_name(), - Location{message.group_id, message.subgroup_id.value_or(0), - message.object_id}, - message.publisher_priority, message.object_status, payload, - end_of_message); + PublishedObjectMetadata metadata; + metadata.location = Location(message.group_id, message.object_id); + metadata.subgroup = message.subgroup_id; + metadata.status = message.object_status; + metadata.publisher_priority = message.publisher_priority; + metadata.arrival_time = session_->callbacks_.clock->Now(); + subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata, + payload, end_of_message); } } else { // FETCH track->OnObjectOrOk(); @@ -1792,8 +1799,7 @@ if (!lazily_initialized_stream_map_.has_value()) { QUICHE_DCHECK( DoesTrackStatusImplyHavingData(*track_publisher_->GetTrackStatus())); - lazily_initialized_stream_map_.emplace( - track_publisher_->GetForwardingPreference()); + lazily_initialized_stream_map_.emplace(); } return *lazily_initialized_stream_map_; } @@ -1878,12 +1884,12 @@ } void MoqtSession::PublishedSubscription::OnNewObjectAvailable( - Location sequence) { + Location sequence, uint64_t subgroup) { if (!InWindow(sequence)) { return; } - if (reset_subgroups_.contains( - Location{sequence.group, sequence.subgroup, 0})) { + DataStreamIndex index(sequence.group, subgroup); + if (reset_subgroups_.contains(index)) { // This subgroup has already been reset, ignore. return; } @@ -1917,12 +1923,14 @@ } std::optional<webtransport::StreamId> stream_id = - stream_map().GetStreamForSequence(sequence); + stream_map().GetStreamFor(index); webtransport::Stream* raw_stream = nullptr; if (stream_id.has_value()) { raw_stream = session_->session_->GetStreamById(*stream_id); } else { - raw_stream = session_->OpenOrQueueDataStream(request_id_, sequence); + raw_stream = session_->OpenOrQueueDataStream( + request_id_, + NewStreamParameters(sequence.group, subgroup, sequence.object)); } if (raw_stream == nullptr) { return; @@ -1938,18 +1946,20 @@ "Publisher is gone"); } -void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) { - if (!InWindow(sequence)) { +// TODO(martinduke): Revise to check if the last object has been delivered. +void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location location, + uint64_t subgroup) { + if (!GroupInWindow(location.group)) { return; } - if (reset_subgroups_.contains( - Location{sequence.group, sequence.subgroup, 0})) { + DataStreamIndex index(location.group, subgroup); + if (reset_subgroups_.contains(index)) { // This subgroup has already been reset, ignore. return; } - QUICHE_DCHECK_GE(sequence.group, first_active_group_); + QUICHE_DCHECK_GE(location.group, first_active_group_); std::optional<webtransport::StreamId> stream_id = - stream_map().GetStreamForSequence(sequence); + stream_map().GetStreamFor(index); if (!stream_id.has_value()) { return; } @@ -1960,22 +1970,23 @@ } OutgoingDataStream* stream = static_cast<OutgoingDataStream*>(raw_stream->visitor()); - stream->Fin(sequence); + stream->Fin(location); } void MoqtSession::PublishedSubscription::OnSubgroupAbandoned( - Location sequence, webtransport::StreamErrorCode error_code) { - if (!InWindow(sequence)) { + uint64_t group, uint64_t subgroup, + webtransport::StreamErrorCode error_code) { + if (!GroupInWindow(group)) { return; } - if (reset_subgroups_.contains( - Location{sequence.group, sequence.subgroup, 0})) { + DataStreamIndex index(group, subgroup); + if (reset_subgroups_.contains(index)) { // This subgroup has already been reset, ignore. return; } - QUICHE_DCHECK_GE(sequence.group, first_active_group_); + QUICHE_DCHECK_GE(group, first_active_group_); std::optional<webtransport::StreamId> stream_id = - stream_map().GetStreamForSequence(sequence); + stream_map().GetStreamFor(index); if (!stream_id.has_value()) { return; } @@ -2004,8 +2015,8 @@ raw_stream->ResetWithUserCode(kResetCodeTimedOut); } first_active_group_ = std::max(first_active_group_, group_id + 1); - absl::erase_if(reset_subgroups_, [&](const Location& sequence) { - return sequence.group < first_active_group_; + absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) { + return index.group < first_active_group_; }); } @@ -2018,7 +2029,7 @@ } webtransport::SendOrder MoqtSession::PublishedSubscription::GetSendOrder( - Location sequence) const { + Location sequence, uint64_t subgroup) const { MoqtForwardingPreference forwarding_preference = track_publisher_->GetForwardingPreference(); @@ -2031,21 +2042,23 @@ delivery_order); } return SendOrderForStream(subscriber_priority_, publisher_priority, - sequence.group, sequence.subgroup, delivery_order); + sequence.group, subgroup, delivery_order); } // Returns the highest send order in the subscription. void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream( - Location first_object) { + const NewStreamParameters& parameters) { std::optional<webtransport::SendOrder> start_send_order = queued_outgoing_data_streams_.empty() ? std::optional<webtransport::SendOrder>() : queued_outgoing_data_streams_.rbegin()->first; - webtransport::SendOrder send_order = GetSendOrder(first_object); + webtransport::SendOrder send_order = + GetSendOrder(Location(parameters.index.group, parameters.first_object), + parameters.index.subgroup); // Zero out the subscriber priority bits, since these will be added when // updating the session. queued_outgoing_data_streams_.emplace( - UpdateSendOrderForSubscriberPriority(send_order, 0), first_object); + UpdateSendOrderForSubscriberPriority(send_order, 0), parameters); if (!start_send_order.has_value()) { session_->UpdateQueuedSendOrder(request_id_, std::nullopt, send_order); } else if (*start_send_order < send_order) { @@ -2054,14 +2067,18 @@ } } -Location MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() { +MoqtSession::NewStreamParameters +MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() { QUICHE_DCHECK(!queued_outgoing_data_streams_.empty()); if (queued_outgoing_data_streams_.empty()) { - return Location(); + QUICHE_BUG(NextQueuedOutgoingDataStream_no_stream) + << "NextQueuedOutgoingDataStream called when there are no streams " + "pending."; + return NewStreamParameters(0, 0, 0); } auto it = queued_outgoing_data_streams_.rbegin(); webtransport::SendOrder old_send_order = FinalizeSendOrder(it->first); - Location first_object = it->second; + NewStreamParameters first_stream = it->second; // converting a reverse iterator to an iterator involves incrementing it and // then taking base(). queued_outgoing_data_streams_.erase((++it).base()); @@ -2075,16 +2092,16 @@ new_send_order); } } - return first_object; + return first_stream; } void MoqtSession::PublishedSubscription::OnDataStreamCreated( - webtransport::StreamId id, Location start_sequence) { + webtransport::StreamId id, DataStreamIndex start_sequence) { ++streams_opened_; stream_map().AddStream(start_sequence, id); } void MoqtSession::PublishedSubscription::OnDataStreamDestroyed( - webtransport::StreamId id, Location end_sequence) { + webtransport::StreamId id, DataStreamIndex end_sequence) { stream_map().RemoveStream(end_sequence, id); } @@ -2099,11 +2116,12 @@ MoqtSession::OutgoingDataStream::OutgoingDataStream( MoqtSession* session, webtransport::Stream* stream, - PublishedSubscription& subscription, Location first_object) + PublishedSubscription& subscription, const NewStreamParameters& parameters) : session_(session), stream_(stream), subscription_id_(subscription.request_id()), - next_object_(first_object), + index_(parameters.index), + next_object_(parameters.first_object), session_liveness_(session->liveness_token_) { UpdateSendOrder(subscription); } @@ -2124,7 +2142,7 @@ } auto it = session_->published_subscriptions_.find(subscription_id_); if (it != session_->published_subscriptions_.end()) { - it->second->OnDataStreamDestroyed(stream_->GetStreamId(), next_object_); + it->second->OnDataStreamDestroyed(stream_->GetStreamId(), index_); } } @@ -2140,7 +2158,7 @@ auto it = stream_->session_->published_subscriptions_.find( stream_->subscription_id_); if (it != stream_->session_->published_subscriptions_.end()) { - it->second->OnStreamTimeout(stream_->next_object_); + it->second->OnStreamTimeout(stream_->index()); } stream_->stream_->ResetWithUserCode(kResetCodeTimedOut); } @@ -2175,11 +2193,17 @@ PublishedSubscription& subscription) { while (stream_->CanWrite()) { std::optional<PublishedObject> object = - subscription.publisher().GetCachedObject(next_object_); + subscription.publisher().GetCachedObject(index_.group, index_.subgroup, + next_object_); if (!object.has_value()) { break; } - if (!subscription.InWindow(next_object_)) { + + QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group); + QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup); + QUICHE_DCHECK(subscription.publisher().GetForwardingPreference() == + MoqtForwardingPreference::kSubgroup); + if (!subscription.InWindow(object->metadata.location)) { // It is possible that the next object became irrelevant due to a // SUBSCRIBE_UPDATE. Close the stream if so. bool success = stream_->SendFin(); @@ -2187,42 +2211,39 @@ << "Writing FIN failed despite CanWrite() being true."; return; } + quic::QuicTimeDelta delivery_timeout = subscription.delivery_timeout(); if (!session_->alternate_delivery_timeout_ && - session_->callbacks_.clock->ApproximateNow() - object->arrival_time > + session_->callbacks_.clock->ApproximateNow() - + object->metadata.arrival_time > delivery_timeout) { - subscription.OnStreamTimeout(next_object_); + subscription.OnStreamTimeout(index_); stream_->ResetWithUserCode(kResetCodeTimedOut); return; } - QUICHE_DCHECK(next_object_ <= object->sequence); - MoqtTrackPublisher& publisher = subscription.publisher(); - QUICHE_DCHECK(DoesTrackStatusImplyHavingData(*publisher.GetTrackStatus())); - MoqtForwardingPreference forwarding_preference = - publisher.GetForwardingPreference(); - UpdateSendOrder(subscription); - if (forwarding_preference == MoqtForwardingPreference::kDatagram) { - QUICHE_BUG(quic_bug_SendObjects_for_Datagram) - << "Datagram Track requesting SendObjects"; - return; - } - next_object_ = object->sequence.next(); - if (session_->WriteObjectToStream( + + if (!session_->WriteObjectToStream( stream_, subscription.track_alias(), *object, MoqtDataStreamType::kStreamHeaderSubgroup, !stream_header_written_, object->fin_after_this)) { - stream_header_written_ = true; - subscription.OnObjectSent(object->sequence); + // WriteObjectToStream() closes the connection on error, meaning that + // there is no need to process the stream any further. + return; } + ++next_object_; + stream_header_written_ = true; + subscription.OnObjectSent(object->metadata.location); + if (object->fin_after_this && !delivery_timeout.IsInfinite() && !session_->alternate_delivery_timeout_) { - CreateAndSetAlarm(object->arrival_time + delivery_timeout); + CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout); } } } void MoqtSession::OutgoingDataStream::Fin(Location last_object) { - if (next_object_ <= last_object) { + QUICHE_DCHECK_EQ(last_object.group, index_.group); + if (next_object_ <= last_object.object) { // There is still data to send, do nothing. return; } @@ -2248,11 +2269,11 @@ QUICHE_DCHECK(stream->CanWrite()); MoqtObject header; header.track_alias = id; - header.group_id = object.sequence.group; - header.subgroup_id = object.sequence.subgroup; - header.object_id = object.sequence.object; - header.publisher_priority = object.publisher_priority; - header.object_status = object.status; + header.group_id = object.metadata.location.group; + header.subgroup_id = object.metadata.subgroup; + header.object_id = object.metadata.location.object; + header.publisher_priority = object.metadata.publisher_priority; + header.object_status = object.metadata.status; header.payload_length = object.payload.length(); quiche::QuicheBuffer serialized_header = @@ -2274,7 +2295,7 @@ } QUIC_DVLOG(1) << "Stream " << stream->GetStreamId() << " successfully wrote " - << object.sequence << ", fin = " << fin; + << object.metadata.location << ", fin = " << fin; return true; } @@ -2299,7 +2320,7 @@ void MoqtSession::PublishedSubscription::SendDatagram(Location sequence) { std::optional<PublishedObject> object = - track_publisher_->GetCachedObject(sequence); + track_publisher_->GetCachedObject(sequence.group, 0, sequence.object); if (!object.has_value()) { QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache) << "Got notification about an object that is not in the cache"; @@ -2308,23 +2329,24 @@ MoqtObject header; header.track_alias = track_alias(); - header.group_id = object->sequence.group; - header.object_id = object->sequence.object; - header.publisher_priority = object->publisher_priority; - header.object_status = object->status; + header.group_id = object->metadata.location.group; + header.object_id = object->metadata.location.object; + header.publisher_priority = object->metadata.publisher_priority; + header.object_status = object->metadata.status; header.subgroup_id = std::nullopt; header.payload_length = object->payload.length(); quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram( header, object->payload.AsStringView()); session_->session_->SendOrQueueDatagram(datagram.AsStringView()); - OnObjectSent(object->sequence); + OnObjectSent(object->metadata.location); } void MoqtSession::OutgoingDataStream::UpdateSendOrder( PublishedSubscription& subscription) { - stream_->SetPriority( - webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId, - subscription.GetSendOrder(next_object_)}); + stream_->SetPriority(webtransport::StreamPriority{ + /*send_group_id=*/kMoqtSendGroupId, + subscription.GetSendOrder(Location(index_.group, next_object_), + index_.subgroup)}); } void MoqtSession::OutgoingDataStream::CreateAndSetAlarm(
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 8448d14..67336e2 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -5,6 +5,7 @@ #ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_ #define QUICHE_QUIC_MOQT_MOQT_SESSION_H_ +#include <algorithm> #include <cstdint> #include <memory> #include <optional> @@ -213,6 +214,15 @@ struct Empty {}; + struct NewStreamParameters { + DataStreamIndex index; + uint64_t first_object; + + NewStreamParameters(uint64_t group, uint64_t subgroup, + uint64_t first_object) + : index(group, subgroup), first_object(first_object) {} + }; + class QUICHE_EXPORT ControlStream : public webtransport::StreamVisitor, public MoqtControlParserVisitor { public: @@ -364,10 +374,10 @@ MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias = std::nullopt) override; // This is only called for objects that have just arrived. - void OnNewObjectAvailable(Location sequence) override; + void OnNewObjectAvailable(Location location, uint64_t subgroup) override; void OnTrackPublisherGone() override; - void OnNewFinAvailable(Location sequence) override; - void OnSubgroupAbandoned(Location sequence, + void OnNewFinAvailable(Location location, uint64_t subgroup) override; + void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup, webtransport::StreamErrorCode error_code) override; void OnGroupAbandoned(uint64_t group_id) override; void ProcessObjectAck(const MoqtObjectAck& message) { @@ -386,6 +396,9 @@ bool InWindow(Location sequence) { return forward_ && window_.has_value() && window_->InWindow(sequence); } + bool GroupInWindow(uint64_t group) { + return forward_ && window_.has_value() && window_->GroupInWindow(group); + } Location GetWindowStart() const { QUICHE_CHECK(window_.has_value()); return window_->start(); @@ -393,38 +406,38 @@ MoqtFilterType filter_type() const { return filter_type_; }; void OnDataStreamCreated(webtransport::StreamId id, - Location start_sequence); + DataStreamIndex start_sequence); void OnDataStreamDestroyed(webtransport::StreamId id, - Location end_sequence); + DataStreamIndex end_sequence); void OnObjectSent(Location sequence); std::vector<webtransport::StreamId> GetAllStreams() const; - webtransport::SendOrder GetSendOrder(Location sequence) const; + webtransport::SendOrder GetSendOrder(Location sequence, + uint64_t subgroup) const; - void AddQueuedOutgoingDataStream(Location first_object); + void AddQueuedOutgoingDataStream(const NewStreamParameters& parameters); // Pops the pending outgoing data stream, with the highest send order. // The session keeps track of which subscribes have pending streams. This // function will trigger a QUICHE_DCHECK if called when there are no pending // streams. - Location NextQueuedOutgoingDataStream(); + NewStreamParameters NextQueuedOutgoingDataStream(); quic::QuicTimeDelta delivery_timeout() const { return delivery_timeout_; } void set_delivery_timeout(quic::QuicTimeDelta timeout) { delivery_timeout_ = timeout; } - void OnStreamTimeout(Location sequence) { - sequence.object = 0; - reset_subgroups_.insert(sequence); + void OnStreamTimeout(DataStreamIndex index) { + reset_subgroups_.insert(index); if (session_->alternate_delivery_timeout_) { - first_active_group_ = std::max(first_active_group_, sequence.group + 1); + first_active_group_ = std::max(first_active_group_, index.group + 1); } } uint64_t first_active_group() const { return first_active_group_; } - absl::flat_hash_set<Location>& reset_subgroups() { + absl::flat_hash_set<DataStreamIndex>& reset_subgroups() { return reset_subgroups_; } @@ -461,7 +474,7 @@ uint64_t first_active_group_ = 0; // If a stream has been reset due to delivery timeout, do not open a new // stream if more object arrive for it. - absl::flat_hash_set<Location> reset_subgroups_; + absl::flat_hash_set<DataStreamIndex> reset_subgroups_; // The min of DELIVERY_TIMEOUT from SUBSCRIBE and SUBSCRIBE_OK. quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite(); @@ -474,14 +487,14 @@ // Store the send order of queued outgoing data streams. Use a // subscriber_priority_ of zero to avoid having to update it, and call // FinalizeSendOrder() whenever delivering it to the MoqtSession. - absl::btree_multimap<webtransport::SendOrder, Location> + absl::btree_multimap<webtransport::SendOrder, NewStreamParameters> queued_outgoing_data_streams_; }; class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor { public: OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream, PublishedSubscription& subscription, - Location first_object); + const NewStreamParameters& parameters); ~OutgoingDataStream(); // webtransport::StreamVisitor implementation. @@ -519,6 +532,8 @@ // alarm is already created. void CreateAndSetAlarm(quic::QuicTime deadline); + DataStreamIndex index() const { return index_; } + private: friend class test::MoqtSessionPeer; friend class DeliveryTimeoutDelegate; @@ -530,10 +545,11 @@ MoqtSession* session_; webtransport::Stream* stream_; uint64_t subscription_id_; - // A Location with the minimum object ID that should go out next. The - // session doesn't know what the next object ID in the stream is because - // the next object could be in a different subgroup or simply be skipped. - Location next_object_; + DataStreamIndex index_; + // Minimum object ID that should go out next. The session doesn't know the + // exact ID of the next object in the stream because the next object could + // be in a different subgroup or simply be skipped. + uint64_t next_object_; bool stream_header_written_ = false; // If this data stream is for SUBSCRIBE, reset it if an object has been // excessively delayed per Section 7.1.1.2. @@ -632,12 +648,12 @@ // Opens a new data stream, or queues it if the session is flow control // blocked. - webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id, - Location first_object); + webtransport::Stream* OpenOrQueueDataStream( + uint64_t subscription_id, const NewStreamParameters& parameters); // Same as above, except the session is required to be not flow control // blocked. webtransport::Stream* OpenDataStream(PublishedSubscription& subscription, - Location first_object); + const NewStreamParameters& parameters); // Returns false if creation failed. [[nodiscard]] bool OpenDataStream(std::shared_ptr<PublishedFetch> fetch, webtransport::SendOrder send_order);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index ed23ccd..9b25531 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -519,7 +519,7 @@ // forward=false, so incoming objects are ignored. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(0, 0)); + listener->OnNewObjectAvailable(Location(0, 0), 0); } TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) { @@ -535,7 +535,7 @@ // Window was not set to (0, 0) by SUBSCRIBE acceptance. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(0, 0)); + listener->OnNewObjectAvailable(Location(0, 0), 0); } TEST_F(MoqtSessionTest, SubscribeNextGroup) { @@ -550,11 +550,11 @@ // Later objects in group 10 ignored. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - listener->OnNewObjectAvailable(Location(10, 21)); + listener->OnNewObjectAvailable(Location(10, 21), 0); // Group 11 is sent. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillOnce(Return(false)); - listener->OnNewObjectAvailable(Location(11, 0)); + listener->OnNewObjectAvailable(Location(11, 0), 0); } TEST_F(MoqtSessionTest, TwoSubscribesForTrack) { @@ -1047,7 +1047,7 @@ MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment).Times(1); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, true); @@ -1072,7 +1072,7 @@ MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment).Times(1); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, false); @@ -1103,7 +1103,7 @@ MoqtSessionPeer::CreateIncomingDataStream( &session, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(2); + EXPECT_CALL(visitor_, OnObjectFragment).Times(2); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, false); @@ -1129,13 +1129,13 @@ MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, Location sequence, - MoqtPriority publisher_priority, MoqtObjectStatus status, + EXPECT_CALL(visitor_, OnObjectFragment) + .WillOnce([&](const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(sequence.group, object.group_id); - EXPECT_EQ(sequence.object, object.object_id); + EXPECT_EQ(metadata.location.group, object.group_id); + EXPECT_EQ(metadata.location.object, object.object_id); }); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -1174,13 +1174,13 @@ MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _)) - .WillOnce([&](const FullTrackName& full_track_name, Location sequence, - MoqtPriority publisher_priority, MoqtObjectStatus status, + EXPECT_CALL(visitor, OnObjectFragment) + .WillOnce([&](const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, absl::string_view payload, bool end_of_message) { EXPECT_EQ(full_track_name, ftn); - EXPECT_EQ(sequence.group, object.group_id); - EXPECT_EQ(sequence.object, object.object_id); + EXPECT_EQ(metadata.location.group, object.group_id); + EXPECT_EQ(metadata.location.object, object.object_id); }); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); @@ -1281,18 +1281,16 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - false}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_TRUE(correct_message); EXPECT_FALSE(fin); EXPECT_EQ(MoqtSessionPeer::LargestSentForSubscription(&session_, 0), @@ -1335,18 +1333,16 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - true}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); } @@ -1387,18 +1383,16 @@ fin |= options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - true}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), true}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_TRUE(correct_message); EXPECT_TRUE(fin); @@ -1435,36 +1429,34 @@ // Verify first six message fields are sent correctly bool correct_message = false; const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f}; - EXPECT_CALL(mock_stream_, Writev(_, _)) + EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { correct_message = absl::StartsWith(data[0], kExpectedMessage); fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - false}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_TRUE(correct_message); EXPECT_FALSE(fin); fin = false; - EXPECT_CALL(mock_stream_, Writev(_, _)) + EXPECT_CALL(mock_stream_, Writev) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { EXPECT_TRUE(data.empty()); fin = options.send_fin(); return absl::OkStatus(); }); - subscription->OnNewFinAvailable(Location(5, 0)); + subscription->OnNewFinAvailable(Location(5, 0), 0); } TEST_F(MoqtSessionTest, SeparateFinForFutureObject) { @@ -1503,39 +1495,38 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - false}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_FALSE(fin); // Try to deliver (5,1), but fail. EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; }); - EXPECT_CALL(*track, GetCachedObject(_)).Times(0); - EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0); - subscription->OnNewObjectAvailable(Location(5, 1)); + EXPECT_CALL(*track, GetCachedObject).Times(0); + EXPECT_CALL(mock_stream_, Writev).Times(0); + subscription->OnNewObjectAvailable(Location(5, 1), 0); // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent. - EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0); - subscription->OnNewFinAvailable(Location(5, 1)); + EXPECT_CALL(mock_stream_, Writev).Times(0); + subscription->OnNewFinAvailable(Location(5, 1), 0); // Reopen the window. correct_message = false; // object id, extensions, payload length, status. const std::string kExpectedMessage2 = {0x01, 0x00, 0x00, 0x03}; EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([&] { - return PublishedObject{ - Location(5, 1), MoqtObjectStatus::kEndOfGroup, 127, - MemSliceFromString(""), MoqtSessionPeer::Now(&session_), true}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([&] { + return PublishedObject{PublishedObjectMetadata{ + Location(5, 1), 0, MoqtObjectStatus::kEndOfGroup, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString(""), true}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 2))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 2)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); EXPECT_CALL(mock_stream_, Writev(_, _)) @@ -1586,22 +1577,20 @@ fin = options.send_fin(); return absl::OkStatus(); }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] { - return PublishedObject{Location(5, 0), - MoqtObjectStatus::kNormal, - 127, - MemSliceFromString("deadbeef"), - MoqtSessionPeer::Now(&session_), - false}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 127, MoqtSessionPeer::Now(&session_)}, + MemSliceFromString("deadbeef"), false}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); // Abandon the subgroup. EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1); - subscription->OnSubgroupAbandoned(Location(5, 0), 0x1); + subscription->OnSubgroupAbandoned(5, 0, 0x1); } // TODO: Test operation with multiple streams. @@ -1616,7 +1605,7 @@ // Queue the outgoing stream. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillOnce(Return(false)); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); // Unblock the session, and cause the queued stream to be sent. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) @@ -1638,11 +1627,13 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] { - return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128, - MemSliceFromString("deadbeef")}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 128}, + MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1658,8 +1649,8 @@ // Queue the outgoing stream. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillRepeatedly(Return(false)); - subscription->OnNewObjectAvailable(Location(5, 0, 0)); - subscription->OnNewObjectAvailable(Location(6, 0, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); + subscription->OnNewObjectAvailable(Location(6, 0), 0); subscription->OnGroupAbandoned(5); // Unblock the session, and cause the queued stream to be sent. There should @@ -1684,11 +1675,13 @@ EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(Location(6, 0))).WillRepeatedly([] { - return PublishedObject{Location(6, 0), MoqtObjectStatus::kNormal, 128, - MemSliceFromString("deadbeef")}; + EXPECT_CALL(*track, GetCachedObject(6, 0, 0)).WillRepeatedly([] { + return PublishedObject{ + PublishedObjectMetadata{Location(6, 0), 0, MoqtObjectStatus::kNormal, + 128}, + MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(Location(6, 1))).WillRepeatedly([] { + EXPECT_CALL(*track, GetCachedObject(6, 0, 1)).WillRepeatedly([] { return std::optional<PublishedObject>(); }); session_.OnCanCreateNewOutgoingUnidirectionalStream(); @@ -1721,21 +1714,23 @@ .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] { - return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128, - MemSliceFromString("deadbeef")}; + EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] { + return PublishedObject{ + PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal, + 128}, + MemSliceFromString("deadbeef")}; }); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillOnce([] { + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillOnce([] { return std::optional<PublishedObject>(); }); - subscription->OnNewObjectAvailable(Location(5, 0)); + subscription->OnNewObjectAvailable(Location(5, 0), 0); // Now that the stream exists and is recorded within subscription, make it // disappear by returning nullptr. EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId)) .WillRepeatedly(Return(nullptr)); - EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).Times(0); - subscription->OnNewObjectAvailable(Location(5, 1)); + EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).Times(0); + subscription->OnNewObjectAvailable(Location(5, 1), 0); } TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) { @@ -1840,12 +1835,13 @@ return webtransport::DatagramStatus( webtransport::DatagramStatusCode::kSuccess, ""); }); - EXPECT_CALL(*track_publisher, GetCachedObject(Location{5, 0})) - .WillRepeatedly([] { - return PublishedObject{Location{5, 0}, MoqtObjectStatus::kNormal, 128, - MemSliceFromString("deadbeef")}; - }); - listener->OnNewObjectAvailable(Location(5, 0)); + EXPECT_CALL(*track_publisher, GetCachedObject(5, 0, 0)).WillRepeatedly([] { + return PublishedObject{ + PublishedObjectMetadata{Location{5, 0}, 0, MoqtObjectStatus::kNormal, + 128}, + MemSliceFromString("deadbeef")}; + }); + listener->OnNewObjectAvailable(Location(5, 0), 0); EXPECT_TRUE(correct_message); } @@ -1866,11 +1862,18 @@ }; char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x08, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; - EXPECT_CALL(visitor_, - OnObjectFragment(ftn, Location{object.group_id, object.object_id}, - object.publisher_priority, object.object_status, - payload, true)) - .Times(1); + EXPECT_CALL(visitor_, OnObjectFragment) + .WillOnce([&](const FullTrackName& track_name, + const PublishedObjectMetadata& metadata, + absl::string_view received_payload, bool fin) { + EXPECT_EQ(track_name, ftn); + EXPECT_EQ(metadata.location, + Location(object.group_id, object.object_id)); + EXPECT_EQ(metadata.publisher_priority, object.publisher_priority); + EXPECT_EQ(metadata.status, object.object_status); + EXPECT_EQ(payload, received_payload); + EXPECT_TRUE(fin); + }); session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); } @@ -1892,7 +1895,7 @@ MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1); + EXPECT_CALL(visitor_, OnObjectFragment).Times(1); EXPECT_CALL(mock_stream_, GetStreamId()) .WillRepeatedly(Return(kIncomingUniStreamId)); object_stream->OnObjectMessage(object, payload, true); @@ -1924,7 +1927,7 @@ std::unique_ptr<MoqtDataParserVisitor> object_stream = MoqtSessionPeer::CreateIncomingDataStream( &session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup); - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(0); + EXPECT_CALL(visitor_, OnObjectFragment).Times(0); object_stream->OnObjectMessage(object, payload, true); } @@ -1936,7 +1939,7 @@ MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_); char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x80, 0x00, 0x08, 0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66}; - EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(0); + EXPECT_CALL(visitor_, OnObjectFragment).Times(0); session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram))); } @@ -1954,9 +1957,9 @@ .WillOnce(Return(false)); EXPECT_CALL(*track, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); - subscription->OnNewObjectAvailable(Location(1, 0)); - subscription->OnNewObjectAvailable(Location(0, 0)); - subscription->OnNewObjectAvailable(Location(2, 0)); + subscription->OnNewObjectAvailable(Location(1, 0), 0); + subscription->OnNewObjectAvailable(Location(0, 0), 0); + subscription->OnNewObjectAvailable(Location(2, 0), 0); // These should be opened in the sequence (0, 0), (1, 0), (2, 0). EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .WillRepeatedly(Return(true)); @@ -1990,24 +1993,24 @@ EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() { return stream_visitor[2].get(); }); - EXPECT_CALL(*track, GetCachedObject(Location(0, 0))) - .WillOnce( - Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(Location(0, 1))) - .WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(Location(1, 0))) - .WillOnce( - Return(PublishedObject{Location(1, 0), MoqtObjectStatus::kNormal, 127, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(Location(1, 1))) - .WillOnce(Return(std::nullopt)); - EXPECT_CALL(*track, GetCachedObject(Location(2, 0))) - .WillOnce( - Return(PublishedObject{Location(2, 0), MoqtObjectStatus::kNormal, 127, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track, GetCachedObject(Location(2, 1))) - .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(0, 0, 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal, + 127}, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(1, 0, 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(1, 0), 0, MoqtObjectStatus::kNormal, + 127}, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(1, 0, 1)).WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track, GetCachedObject(2, 0, 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(2, 0), 0, MoqtObjectStatus::kNormal, + 127}, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track, GetCachedObject(2, 0, 1)).WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true)); @@ -2047,7 +2050,7 @@ .WillOnce(Return(false)); EXPECT_CALL(*track, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); - subscription->OnNewObjectAvailable(Location(0, 0)); + subscription->OnNewObjectAvailable(Location(0, 0), 0); // Delete the subscription, then grant stream credit. MoqtSessionPeer::DeleteSubscription(&session_, 0); @@ -2085,10 +2088,10 @@ .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); EXPECT_CALL(*track2, GetTrackStatus()) .WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress)); - subscription0->OnNewObjectAvailable(Location(0, 0)); - subscription1->OnNewObjectAvailable(Location(0, 0)); - subscription0->OnNewObjectAvailable(Location(1, 0)); - subscription1->OnNewObjectAvailable(Location(1, 0)); + subscription0->OnNewObjectAvailable(Location(0, 0), 0); + subscription1->OnNewObjectAvailable(Location(0, 0), 0); + subscription0->OnNewObjectAvailable(Location(1, 0), 0); + subscription1->OnNewObjectAvailable(Location(1, 0), 0); // Allow one stream to be opened. It will be group 0, subscription 0. EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) @@ -2106,14 +2109,14 @@ EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() { return stream_visitor0.get(); }); - EXPECT_CALL(*track1, GetCachedObject(Location(0, 0))) - .WillOnce( - Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127, - MemSliceFromString("foobar")})); - EXPECT_CALL(*track1, GetCachedObject(Location(0, 1))) - .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track1, GetCachedObject(0, 0, 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal, + 127}, + MemSliceFromString("foobar")})); + EXPECT_CALL(*track1, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true)); - EXPECT_CALL(mock_stream0, Writev(_, _)) + EXPECT_CALL(mock_stream0, Writev) .WillOnce([&](absl::Span<const absl::string_view> data, const quiche::StreamWriteOptions& options) { // Check track alias is 14. @@ -2142,12 +2145,12 @@ EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() { return stream_visitor1.get(); }); - EXPECT_CALL(*track2, GetCachedObject(Location(0, 0))) - .WillOnce( - Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127, - MemSliceFromString("deadbeef")})); - EXPECT_CALL(*track2, GetCachedObject(Location(0, 1))) - .WillOnce(Return(std::nullopt)); + EXPECT_CALL(*track2, GetCachedObject(0, 0, 0)) + .WillOnce(Return(PublishedObject{ + PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal, + 127}, + MemSliceFromString("deadbeef")})); + EXPECT_CALL(*track2, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true)); EXPECT_CALL(mock_stream1, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, @@ -2195,9 +2198,10 @@ EXPECT_CALL(data_stream, CanWrite).WillRepeatedly(Return(true)); EXPECT_CALL(*fetch_task, GetNextObject) .WillOnce(Invoke([=](PublishedObject& output) { - output.sequence = location; - output.status = status; - output.publisher_priority = 128; + output.metadata.location = location; + output.metadata.subgroup = 0; + output.metadata.status = status; + output.metadata.publisher_priority = 128; output.payload = quiche::QuicheMemSlice::Copy(payload); output.fin_after_this = true; // should be ignored. return MoqtFetchTask::GetNextObjectResult::kSuccess; @@ -2424,16 +2428,16 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); MockTrackPublisher* track = CreateTrackPublisher(); - SetLargestId(track, Location(4, 0, 10)); + SetLargestId(track, Location(4, 10)); ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get()); MoqtObjectListener* subscription = MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id); ASSERT_NE(subscription, nullptr); EXPECT_TRUE( - MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11))); + MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11))); EXPECT_FALSE( - MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 10))); + MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10))); // Joining FETCH arrives. The resulting Fetch should begin at (2, 0). MoqtFetch fetch = DefaultFetch(); @@ -2464,7 +2468,7 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); MockTrackPublisher* track = CreateTrackPublisher(); - SetLargestId(track, Location(2, 0, 10)); + SetLargestId(track, Location(2, 10)); ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get()); MoqtFetch fetch = DefaultFetch(); @@ -2672,7 +2676,7 @@ do { result = fetch_task->GetNextObject(object); if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) { - EXPECT_EQ(object.sequence.object, expected_object_id); + EXPECT_EQ(object.metadata.location.object, expected_object_id); ++expected_object_id; } } while (result != MoqtFetchTask::GetNextObjectResult::kPending); @@ -2801,7 +2805,7 @@ PublishedObject new_object; result = fetch_task->GetNextObject(new_object); if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) { - EXPECT_EQ(new_object.sequence.object, expected_object_id); + EXPECT_EQ(new_object.metadata.location.object, expected_object_id); ++expected_object_id; } } while (result != MoqtFetchTask::GetNextObjectResult::kPending); @@ -2822,7 +2826,7 @@ PublishedObject new_object; result = fetch_task->GetNextObject(new_object); if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) { - EXPECT_EQ(new_object.sequence.object, expected_object_id); + EXPECT_EQ(new_object.metadata.location.object, expected_object_id); ++expected_object_id; } } while (result != MoqtFetchTask::GetNextObjectResult::kPending); @@ -2915,30 +2919,35 @@ EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() { return stream_visitor.get(); })); - EXPECT_CALL(*track_publisher, GetCachedObject(_)) - .WillOnce(Return(PublishedObject{ - Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0, - quiche::QuicheMemSlice(), - MoqtSessionPeer::Now(&session_) - quic::QuicTimeDelta::FromSeconds(1), - false})); + EXPECT_CALL(*track_publisher, GetCachedObject) + .WillOnce( + Return(PublishedObject{PublishedObjectMetadata{ + Location(0, 0), + 0, + MoqtObjectStatus::kObjectDoesNotExist, + 0, + MoqtSessionPeer::Now(&session_) - + quic::QuicTimeDelta::FromSeconds(1), + }, + quiche::QuicheMemSlice(), false})); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) .WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) { stream_visitor.reset(); })); // Arrival time is very old; reset immediately. - subscription->OnNewObjectAvailable(Location(0, 0, 0)); + subscription->OnNewObjectAvailable(Location(0, 0), 0); // Subsequent objects for that subgroup are ignored. - EXPECT_CALL(*track_publisher, GetCachedObject(_)).Times(0); + EXPECT_CALL(*track_publisher, GetCachedObject).Times(0); EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0); EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) .Times(0); - subscription->OnNewObjectAvailable(Location(0, 0, 1)); + subscription->OnNewObjectAvailable(Location(0, 1), 0); // Check that reset_subgroups_ is pruned. - EXPECT_TRUE( - MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1))); + EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription, + DataStreamIndex(0, 0))); subscription->OnGroupAbandoned(0); - EXPECT_FALSE( - MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1))); + EXPECT_FALSE(MoqtSessionPeer::SubgroupHasBeenReset(subscription, + DataStreamIndex(0, 0))); } TEST_F(MoqtSessionTest, DeliveryTimeoutAfterIntegratedFin) { @@ -2971,14 +2980,16 @@ EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() { return stream_visitor.get(); })); - EXPECT_CALL(*track_publisher, GetCachedObject(_)) + EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ - Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0, - quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), true})) + PublishedObjectMetadata{Location(0, 0), 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice(), true})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)).Times(0); - subscription->OnNewObjectAvailable(Location(0, 0, 0)); + subscription->OnNewObjectAvailable(Location(0, 0), 0); auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor.get())); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) @@ -3020,16 +3031,18 @@ EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() { return stream_visitor.get(); })); - EXPECT_CALL(*track_publisher, GetCachedObject(_)) + EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ - Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0, - quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false})) + PublishedObjectMetadata{Location(0, 0), 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - subscription->OnNewObjectAvailable(Location(0, 0, 0)); + subscription->OnNewObjectAvailable(Location(0, 0), 0); EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - subscription->OnNewFinAvailable(Location(0, 0, 0)); + subscription->OnNewFinAvailable(Location(0, 0), 0); auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( MoqtSessionPeer::GetAlarm(stream_visitor.get())); EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)) @@ -3072,13 +3085,15 @@ EXPECT_CALL(data_mock1, visitor()).WillRepeatedly(Invoke([&]() { return stream_visitor1.get(); })); - EXPECT_CALL(*track_publisher, GetCachedObject(_)) + EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ - Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0, - quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false})) + PublishedObjectMetadata{Location(0, 0), 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - subscription->OnNewObjectAvailable(Location(0, 0, 0)); + subscription->OnNewObjectAvailable(Location(0, 0), 0); webtransport::test::MockStream data_mock2; EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream()) @@ -3097,13 +3112,15 @@ EXPECT_CALL(data_mock2, visitor()).WillRepeatedly(Invoke([&]() { return stream_visitor2.get(); })); - EXPECT_CALL(*track_publisher, GetCachedObject(_)) + EXPECT_CALL(*track_publisher, GetCachedObject) .WillOnce(Return(PublishedObject{ - Location(1, 0), MoqtObjectStatus::kObjectDoesNotExist, 0, - quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false})) + PublishedObjectMetadata{Location(1, 0), 0, + MoqtObjectStatus::kObjectDoesNotExist, 0, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice(), false})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus())); - subscription->OnNewObjectAvailable(Location(1, 0, 0)); + subscription->OnNewObjectAvailable(Location(1, 0), 0); // Group 1 should start the timer on the Group 0 stream. auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc index 2fb7225..1e0ad50 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -5,9 +5,11 @@ #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include <cstdint> +#include <limits> #include <optional> #include <vector> +#include "quiche/quic/core/quic_interval.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" @@ -15,59 +17,24 @@ namespace moqt { -ReducedSequenceIndex::ReducedSequenceIndex( - Location sequence, MoqtForwardingPreference preference) { - switch (preference) { - case MoqtForwardingPreference::kSubgroup: - sequence_ = Location(sequence.group, sequence.subgroup, 0); - break; - case MoqtForwardingPreference::kDatagram: - sequence_ = Location(sequence.group, 0, sequence.object); - return; - } -} - -std::optional<webtransport::StreamId> SendStreamMap::GetStreamForSequence( - Location sequence) const { - QUICHE_DCHECK(forwarding_preference_ == MoqtForwardingPreference::kSubgroup); - Location index = - ReducedSequenceIndex(sequence, forwarding_preference_).sequence(); - auto group_it = send_streams_.find(index.group); - if (group_it == send_streams_.end()) { +std::optional<webtransport::StreamId> SendStreamMap::GetStreamFor( + DataStreamIndex index) const { + auto it = send_streams_.find(index); + if (it == send_streams_.end()) { return std::nullopt; } - auto subgroup_it = group_it->second.find(index.subgroup); - if (subgroup_it == group_it->second.end()) { - return std::nullopt; - } - return subgroup_it->second; + return it->second; } -void SendStreamMap::AddStream(Location sequence, +void SendStreamMap::AddStream(DataStreamIndex index, webtransport::StreamId stream_id) { - Location index = - ReducedSequenceIndex(sequence, forwarding_preference_).sequence(); - auto [it, result] = send_streams_.insert({index.group, Group()}); - auto [sg, success] = it->second.try_emplace(index.subgroup, stream_id); + auto [it, success] = send_streams_.emplace(index, stream_id); QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added"; } -void SendStreamMap::RemoveStream(Location sequence, +void SendStreamMap::RemoveStream(DataStreamIndex index, webtransport::StreamId stream_id) { - Location index = - ReducedSequenceIndex(sequence, forwarding_preference_).sequence(); - auto group_it = send_streams_.find(index.group); - if (group_it == send_streams_.end()) { - QUICHE_NOTREACHED(); - return; - } - auto subgroup_it = group_it->second.find(index.subgroup); - if (subgroup_it == group_it->second.end() || - subgroup_it->second != stream_id) { - QUICHE_NOTREACHED(); - return; - } - group_it->second.erase(subgroup_it); + send_streams_.erase(index); } bool SubscribeWindow::TruncateStart(Location start) { @@ -96,25 +63,30 @@ std::vector<webtransport::StreamId> SendStreamMap::GetAllStreams() const { std::vector<webtransport::StreamId> ids; - for (const auto& [group, subgroup_map] : send_streams_) { - for (const auto& [subgroup, stream_id] : subgroup_map) { - ids.push_back(stream_id); - } + for (const auto& [index, stream_id] : send_streams_) { + ids.push_back(stream_id); } return ids; } std::vector<webtransport::StreamId> SendStreamMap::GetStreamsForGroup( uint64_t group_id) const { + const auto start_it = send_streams_.lower_bound(DataStreamIndex(group_id, 0)); + const auto end_it = send_streams_.upper_bound( + DataStreamIndex(group_id, std::numeric_limits<uint64_t>::max())); std::vector<webtransport::StreamId> ids; - auto it = send_streams_.find(group_id); - if (it == send_streams_.end()) { - return ids; - } - for (const auto& [subgroup, stream_id] : it->second) { - ids.push_back(stream_id); + for (auto it = start_it; it != end_it; ++it) { + ids.push_back(it->second); } return ids; } +bool SubscribeWindow::GroupInWindow(uint64_t group) const { + const quic::QuicInterval<Location> group_window( + Location(group, 0), + Location(group, std::numeric_limits<uint64_t>::max())); + const quic::QuicInterval<Location> subscription_window(start_, end_); + return group_window.Intersects(subscription_window); +} + } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h index f1eea5d..17d5351 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -7,10 +7,13 @@ #include <cstdint> #include <optional> +#include <tuple> #include <vector> #include "absl/container/btree_map.h" +#include "quiche/quic/core/quic_interval.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/web_transport/web_transport.h" @@ -37,6 +40,7 @@ bool InWindow(const Location& seq) const { return start_ <= seq && seq <= end_; } + bool GroupInWindow(uint64_t group) const; Location start() const { return start_; } Location end() const { return end_; } @@ -55,47 +59,57 @@ Location end_ = Location(UINT64_MAX, UINT64_MAX); }; -// ReducedSequenceIndex represents an index object such that if two sequence -// numbers are mapped to the same stream, they will be mapped to the same index. -class ReducedSequenceIndex { - public: - ReducedSequenceIndex(Location sequence, MoqtForwardingPreference preference); +// A tuple uniquely identifying a WebTransport data stream associated with a +// subscription. By convention, if a DataStreamIndex is necessary for a datagram +// track, `subgroup` is set to zero. +struct DataStreamIndex { + uint64_t group = 0; + uint64_t subgroup = 0; - bool operator==(const ReducedSequenceIndex& other) const { - return sequence_ == other.sequence_; + DataStreamIndex() = default; + DataStreamIndex(uint64_t group, uint64_t subgroup) + : group(group), subgroup(subgroup) {} + explicit DataStreamIndex(const PublishedObject& object) + : group(object.metadata.location.group), + subgroup(object.metadata.subgroup.value_or(0)) {} + + bool operator==(const DataStreamIndex& other) const { + return group == other.group && subgroup == other.subgroup; } - bool operator!=(const ReducedSequenceIndex& other) const { - return sequence_ != other.sequence_; + + bool operator<(const DataStreamIndex& other) const { + return std::make_tuple(group, subgroup) < + std::make_tuple(other.group, other.subgroup); } - Location sequence() { return sequence_; } + bool operator<=(const DataStreamIndex& other) const { + return std::make_tuple(group, subgroup) <= + std::make_tuple(other.group, other.subgroup); + } + bool operator>(const DataStreamIndex& other) const { + return !(*this <= other); + } template <typename H> - friend H AbslHashValue(H h, const ReducedSequenceIndex& m) { - return H::combine(std::move(h), m.sequence_); + friend H AbslHashValue(H h, const DataStreamIndex& index) { + return H::combine(std::move(h), index.group, index.subgroup); } - - private: - Location sequence_; }; // A map of outgoing data streams indexed by object sequence numbers. class QUICHE_EXPORT SendStreamMap { public: - explicit SendStreamMap(MoqtForwardingPreference forwarding_preference) - : forwarding_preference_(forwarding_preference) {} + SendStreamMap() = default; - std::optional<webtransport::StreamId> GetStreamForSequence( - Location sequence) const; - void AddStream(Location sequence, webtransport::StreamId stream_id); - void RemoveStream(Location sequence, webtransport::StreamId stream_id); + std::optional<webtransport::StreamId> GetStreamFor( + DataStreamIndex index) const; + void AddStream(DataStreamIndex index, webtransport::StreamId stream_id); + void RemoveStream(DataStreamIndex index, webtransport::StreamId stream_id); std::vector<webtransport::StreamId> GetAllStreams() const; std::vector<webtransport::StreamId> GetStreamsForGroup( uint64_t group_id) const; private: - using Group = absl::btree_map<uint64_t, webtransport::StreamId>; - absl::btree_map<uint64_t, Group> send_streams_; - MoqtForwardingPreference forwarding_preference_; + absl::btree_map<DataStreamIndex, webtransport::StreamId> send_streams_; }; } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc index 2f2ed7a..cfbeb8f 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc +++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -34,16 +34,16 @@ } TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) { - SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup); - stream_map.AddStream(Location{4, 0}, 2); - EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), std::nullopt); - stream_map.AddStream(Location{5, 2}, 6); - EXPECT_QUIC_BUG(stream_map.AddStream(Location{5, 3}, 6), + SendStreamMap stream_map; + stream_map.AddStream(DataStreamIndex{4, 0}, 2); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 0)), std::nullopt); + stream_map.AddStream(DataStreamIndex{5, 0}, 6); + stream_map.AddStream(DataStreamIndex{5, 1}, 7); + EXPECT_QUIC_BUG(stream_map.AddStream(DataStreamIndex{5, 0}, 6), "Stream already added"); - EXPECT_EQ(stream_map.GetStreamForSequence(Location(4, 1)), 2); - EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), 6); - stream_map.RemoveStream(Location{5, 1}, 6); - EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 2)), std::nullopt); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(4, 0)), 2); + stream_map.RemoveStream(DataStreamIndex{5, 1}, 7); + EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 1)), std::nullopt); } TEST_F(SubscribeWindowTest, UpdateStartEnd) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index b62f5b4..6e08c2c 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -98,8 +98,7 @@ PublishedObject object; switch (fetch_task_->GetNextObject(object)) { case MoqtFetchTask::GetNextObjectResult::kSuccess: - visitor_->OnObjectFragment(full_track_name(), object.sequence, - object.publisher_priority, object.status, + visitor_->OnObjectFragment(full_track_name(), object.metadata, object.payload.AsStringView(), true); break; case MoqtFetchTask::GetNextObjectResult::kError: @@ -164,13 +163,14 @@ quiche::QuicheMemSlice message_slice(std::move(payload_)); output.payload = std::move(message_slice); } - output.sequence = - Location(next_object_->group_id, next_object_->subgroup_id.value_or(0), - next_object_->object_id); - output.status = next_object_->object_status; - output.publisher_priority = next_object_->publisher_priority; + output.metadata.location = + Location(next_object_->group_id, next_object_->object_id); + output.metadata.subgroup = next_object_->subgroup_id.value_or(0); + output.metadata.status = next_object_->object_status; + output.metadata.publisher_priority = next_object_->publisher_priority; output.fin_after_this = false; - if (output.sequence == largest_location_) { // This is the last object. + if (output.metadata.location == + largest_location_) { // This is the last object. eof_ = true; } next_object_.reset();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index d9abd82..74761f0 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -117,9 +117,7 @@ virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0; // Called when an object fragment (or an entire object) is received. virtual void OnObjectFragment(const FullTrackName& full_track_name, - Location sequence, - MoqtPriority publisher_priority, - MoqtObjectStatus object_status, + const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) = 0; virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 58f1282..5090f3d 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -179,7 +179,8 @@ got_object = true; EXPECT_EQ(fetch_task_->GetNextObject(object), MoqtFetchTask::GetNextObjectResult::kSuccess); - EXPECT_EQ(object.sequence, Location(3, 0, 0)); + EXPECT_EQ(object.metadata.location, Location(3, 0)); + EXPECT_EQ(object.metadata.subgroup, 0); EXPECT_EQ(object.payload.AsStringView(), "foobar"); }); int got_read_callback = 0;
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc deleted file mode 100644 index d9c2e1b..0000000 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc +++ /dev/null
@@ -1,225 +0,0 @@ -// Copyright 2025 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "quiche/quic/moqt/test_tools/mock_moqt_session.h" - -#include <cstdint> -#include <memory> -#include <optional> -#include <utility> - -#include "absl/status/status.h" -#include "absl/status/statusor.h" -#include "absl/strings/string_view.h" -#include "quiche/quic/moqt/moqt_failed_fetch.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" -#include "quiche/quic/moqt/moqt_track.h" -#include "quiche/common/platform/api/quiche_logging.h" -#include "quiche/common/platform/api/quiche_test.h" -#include "quiche/web_transport/web_transport.h" - -namespace moqt::test { - -namespace { -using ::testing::_; -} - -// Object listener that forwards all of the objects to the -// SubcribeRemoteTrack::Visitor provided. -class MockMoqtSession::LoopbackObjectListener : public MoqtObjectListener { - public: - LoopbackObjectListener(FullTrackName name, - SubscribeRemoteTrack::Visitor* visitor, - std::shared_ptr<MoqtTrackPublisher> publisher, - SubscribeWindow window) - : name_(name), - visitor_(visitor), - publisher_(std::move(publisher)), - window_(std::move(window)) { - publisher_->AddObjectListener(this); - } - ~LoopbackObjectListener() { publisher_->RemoveObjectListener(this); } - - LoopbackObjectListener(const LoopbackObjectListener&) = delete; - LoopbackObjectListener(LoopbackObjectListener&&) = delete; - LoopbackObjectListener& operator=(const LoopbackObjectListener&) = delete; - LoopbackObjectListener& operator=(LoopbackObjectListener&&) = delete; - - void OnSubscribeAccepted() override { - visitor_->OnReply(name_, - HasObjects() - ? std::make_optional(publisher_->GetLargestLocation()) - : std::nullopt, - std::nullopt); - } - - void OnSubscribeRejected(MoqtSubscribeErrorReason reason, - std::optional<uint64_t> track_alias) { - visitor_->OnReply(name_, std::nullopt, reason.reason_phrase); - } - - void OnNewObjectAvailable(Location sequence) { - std::optional<PublishedObject> object = - publisher_->GetCachedObject(sequence); - if (!object.has_value()) { - QUICHE_LOG(FATAL) - << "GetCachedObject() returned nullopt for a sequence passed into " - "OnNewObjectAvailable()"; - return; - } - if (!window_.InWindow(object->sequence)) { - return; - } - visitor_->OnObjectFragment(name_, sequence, object->publisher_priority, - object->status, object->payload.AsStringView(), - /*end_of_message=*/true); - } - - void OnNewFinAvailable(Location sequence) override {} - void OnSubgroupAbandoned(Location sequence, - webtransport::StreamErrorCode error_code) override {} - void OnGroupAbandoned(uint64_t group_id) override {} - void OnTrackPublisherGone() override { visitor_->OnSubscribeDone(name_); } - - private: - bool HasObjects() { - absl::StatusOr<MoqtTrackStatusCode> status = publisher_->GetTrackStatus(); - if (!status.ok()) { - return false; - } - return *status == MoqtTrackStatusCode::kInProgress || - *status == MoqtTrackStatusCode::kFinished; - } - - FullTrackName name_; - SubscribeRemoteTrack::Visitor* visitor_; - std::shared_ptr<MoqtTrackPublisher> publisher_; - SubscribeWindow window_; -}; - -bool MockMoqtSession::Subscribe(const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - SubscribeWindow window) { - auto track_publisher = publisher_->GetTrack(name); - if (!track_publisher.ok()) { - visitor->OnReply(name, std::nullopt, track_publisher.status().ToString()); - return false; - } - auto [it, inserted] = receiving_subscriptions_.insert( - {name, - std::make_unique<LoopbackObjectListener>( - name, visitor, *std::move(track_publisher), std::move(window))}); - return inserted; -} - -MockMoqtSession::MockMoqtSession(MoqtPublisher* publisher) - : publisher_(publisher) { - ON_CALL(*this, Error) - .WillByDefault([](MoqtError code, absl::string_view error) { - ADD_FAILURE() << "Unhandled MoQT fatal error, with code " - << static_cast<int>(code) << " and message: " << error; - }); - if (publisher_ != nullptr) { - ON_CALL(*this, SubscribeCurrentObject) - .WillByDefault([this](const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters) { - return Subscribe(name, visitor, SubscribeWindow()); - }); - ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _)) - .WillByDefault([this](const FullTrackName& name, uint64_t start_group, - uint64_t start_object, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters) { - return Subscribe( - name, visitor, - SubscribeWindow(Location(start_group, start_object))); - }); - ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _, _)) - .WillByDefault([this](const FullTrackName& name, uint64_t start_group, - uint64_t start_object, uint64_t end_group, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters) { - return Subscribe( - name, visitor, - SubscribeWindow(Location(start_group, start_object), end_group)); - }); - ON_CALL(*this, Unsubscribe) - .WillByDefault([this](const FullTrackName& name) { - receiving_subscriptions_.erase(name); - }); - ON_CALL(*this, Fetch) - .WillByDefault( - [this](const FullTrackName& name, FetchResponseCallback callback, - Location start, uint64_t end_group, - std::optional<uint64_t> end_object, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) { - auto track_publisher = publisher_->GetTrack(name); - if (!track_publisher.ok()) { - std::move(callback)(std::make_unique<MoqtFailedFetch>( - track_publisher.status())); - return true; - } - std::move(callback)(track_publisher->get()->Fetch( - start, end_group, end_object, - delivery_order.value_or(MoqtDeliveryOrder::kAscending))); - return true; - }); - ON_CALL(*this, JoiningFetch(_, _, _, _)) - .WillByDefault([this](const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - uint64_t num_previous_groups, - VersionSpecificParameters parameters) { - return JoiningFetch( - name, visitor, - [name, visitor](std::unique_ptr<MoqtFetchTask> fetch) { - PublishedObject object; - while (fetch->GetNextObject(object) == - MoqtFetchTask::kSuccess) { - visitor->OnObjectFragment( - name, object.sequence, object.publisher_priority, - object.status, object.payload.AsStringView(), true); - } - }, - num_previous_groups, 0x80, std::nullopt, parameters); - }); - ON_CALL(*this, JoiningFetch(_, _, _, _, _, _, _)) - .WillByDefault([this](const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - FetchResponseCallback callback, - uint64_t num_previous_groups, - MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) { - SubscribeCurrentObject(name, visitor, parameters); - auto track_publisher = publisher_->GetTrack(name); - if (!track_publisher.ok()) { - std::move(callback)( - std::make_unique<MoqtFailedFetch>(track_publisher.status())); - return true; - } - if (track_publisher->get()->GetTrackStatus().value_or( - MoqtTrackStatusCode::kStatusNotAvailable) == - MoqtTrackStatusCode::kNotYetBegun) { - return Fetch(name, std::move(callback), Location(0, 0), 0, 0, - priority, delivery_order, std::move(parameters)); - } - Location largest = track_publisher->get()->GetLargestLocation(); - uint64_t start_group = largest.group >= num_previous_groups - ? largest.group - num_previous_groups + 1 - : 0; - return Fetch(name, std::move(callback), Location(start_group, 0), - largest.group, largest.object, priority, delivery_order, - std::move(parameters)); - }); - } -} - -MockMoqtSession::~MockMoqtSession() = default; - -} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h deleted file mode 100644 index b80ed7b..0000000 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ /dev/null
@@ -1,108 +0,0 @@ -// Copyright 2025 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#ifndef QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_ -#define QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_ - -#include <cstdint> -#include <memory> -#include <optional> - -#include "absl/container/flat_hash_map.h" -#include "absl/strings/string_view.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/moqt_session_callbacks.h" -#include "quiche/quic/moqt/moqt_session_interface.h" -#include "quiche/quic/moqt/moqt_subscribe_windows.h" -#include "quiche/quic/moqt/moqt_track.h" -#include "quiche/common/platform/api/quiche_logging.h" -#include "quiche/common/platform/api/quiche_test.h" - -namespace moqt::test { - -// Mock version of MoqtSession. If `publisher` is provided via constructor, all -// of the SUBSCRIBE and FETCH requests are routed towards it. -class MockMoqtSession : public MoqtSessionInterface { - public: - explicit MockMoqtSession(MoqtPublisher* publisher); - ~MockMoqtSession() override; - - MockMoqtSession(const MockMoqtSession&) = delete; - MockMoqtSession(MockMoqtSession&&) = delete; - MockMoqtSession& operator=(const MockMoqtSession&) = delete; - MockMoqtSession& operator=(MockMoqtSession&&) = delete; - - MoqtSessionCallbacks& callbacks() override { return callbacks_; } - - MOCK_METHOD(void, Error, (MoqtError code, absl::string_view error), - (override)); - MOCK_METHOD(bool, SubscribeAbsolute, - (const FullTrackName& name, uint64_t start_group, - uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, SubscribeAbsolute, - (const FullTrackName& name, uint64_t start_group, - uint64_t start_object, uint64_t end_group, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, SubscribeCurrentObject, - (const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, SubscribeNextGroup, - (const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, SubscribeUpdate, - (const FullTrackName& name, std::optional<Location> start, - std::optional<uint64_t> end_group, - std::optional<MoqtPriority> subscriber_priority, - std::optional<bool> forward, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override)); - MOCK_METHOD(bool, Fetch, - (const FullTrackName& name, FetchResponseCallback callback, - Location start, uint64_t end_group, - std::optional<uint64_t> end_object, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, JoiningFetch, - (const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - uint64_t num_previous_groups, - VersionSpecificParameters parameters), - (override)); - MOCK_METHOD(bool, JoiningFetch, - (const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - FetchResponseCallback callback, uint64_t num_previous_groups, - MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters), - (override)); - - private: - class LoopbackObjectListener; - - bool Subscribe(const FullTrackName& name, - SubscribeRemoteTrack::Visitor* visitor, - SubscribeWindow window); - - MoqtPublisher* const publisher_ = nullptr; - MoqtSessionCallbacks callbacks_; - absl::flat_hash_map<FullTrackName, std::unique_ptr<LoopbackObjectListener>> - receiving_subscriptions_; -}; - -} // namespace moqt::test - -#endif // QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc deleted file mode 100644 index 1df0121..0000000 --- a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc +++ /dev/null
@@ -1,135 +0,0 @@ -// Copyright 2025 The Chromium Authors. All rights reserved. -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -#include "quiche/quic/moqt/test_tools/mock_moqt_session.h" - -#include <memory> -#include <optional> -#include <utility> - -#include "quiche/quic/moqt/moqt_known_track_publisher.h" -#include "quiche/quic/moqt/moqt_messages.h" -#include "quiche/quic/moqt/moqt_outgoing_queue.h" -#include "quiche/quic/moqt/moqt_publisher.h" -#include "quiche/quic/moqt/tools/moqt_mock_visitor.h" -#include "quiche/quic/test_tools/quic_test_utils.h" -#include "quiche/common/platform/api/quiche_test.h" - -namespace moqt::test { -namespace { - -using ::testing::_; -using ::testing::Eq; -using ::testing::HasSubstr; -using ::testing::Optional; - -FullTrackName TrackName() { return FullTrackName("foo", "bar"); } - -class MockMoqtSessionTest : public quiche::test::QuicheTest { - protected: - MockMoqtSessionTest() : session_(&publisher_) { - track_ = std::make_shared<MoqtOutgoingQueue>( - TrackName(), MoqtForwardingPreference::kSubgroup); - publisher_.Add(track_); - } - - MoqtKnownTrackPublisher publisher_; - std::shared_ptr<MoqtOutgoingQueue> track_; - MockMoqtSession session_; -}; - -TEST_F(MockMoqtSessionTest, MissingTrack) { - testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; - EXPECT_CALL(visitor, - OnReply(FullTrackName("doesn't", "exist"), Eq(std::nullopt), - Optional(HasSubstr("not found")))); - session_.SubscribeCurrentObject(FullTrackName("doesn't", "exist"), &visitor, - VersionSpecificParameters()); -} - -TEST_F(MockMoqtSessionTest, SubscribeCurrentObject) { - testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; - EXPECT_CALL(visitor, - OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); - session_.SubscribeCurrentObject(TrackName(), &visitor, - VersionSpecificParameters()); - EXPECT_CALL(visitor, - OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _)); - track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true); - - session_.Unsubscribe(TrackName()); - track_->AddObject(quic::test::MemSliceFromString("test2"), /*key=*/true); - // No visitor call here. -} - -TEST_F(MockMoqtSessionTest, SubscribeAbsolute) { - testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; - EXPECT_CALL(visitor, - OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); - session_.SubscribeAbsolute(TrackName(), 1, 0, 1, &visitor, - VersionSpecificParameters()); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 0), _, - MoqtObjectStatus::kNormal, "b", _)); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 1), _, - MoqtObjectStatus::kEndOfGroup, "", _)); - track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/true); -} - -TEST_F(MockMoqtSessionTest, Fetch) { - track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/false); - track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/false); - track_->AddObject(quic::test::MemSliceFromString("d"), /*key=*/true); - std::unique_ptr<MoqtFetchTask> fetch; - session_.Fetch( - TrackName(), - [&](std::unique_ptr<MoqtFetchTask> new_fetch) { - fetch = std::move(new_fetch); - }, - Location(0, 1), 0, 2, 0x80, std::nullopt, VersionSpecificParameters()); - PublishedObject object; - ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess); - EXPECT_EQ(object.payload.AsStringView(), "b"); - ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess); - EXPECT_EQ(object.payload.AsStringView(), "c"); - ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kEof); -} - -TEST_F(MockMoqtSessionTest, JoiningFetch) { - track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/true); - track_->AddObject(quic::test::MemSliceFromString("d"), /*key=*/true); - - testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; - EXPECT_CALL(visitor, - OnReply(TrackName(), Eq(Location(3, 0)), Eq(std::nullopt))); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 0), _, - MoqtObjectStatus::kNormal, "c", _)); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 1), _, - MoqtObjectStatus::kEndOfGroup, "", _)); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 0), _, - MoqtObjectStatus::kNormal, "d", _)); - session_.JoiningFetch(TrackName(), &visitor, 2, VersionSpecificParameters()); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 1), _, - MoqtObjectStatus::kEndOfGroup, "", _)); - EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(4, 0), _, - MoqtObjectStatus::kNormal, "e", _)); - track_->AddObject(quic::test::MemSliceFromString("e"), /*key=*/true); -} - -TEST_F(MockMoqtSessionTest, JoiningFetchNoObjects) { - testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; - EXPECT_CALL(visitor, - OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); - session_.JoiningFetch(TrackName(), &visitor, 0, VersionSpecificParameters()); - EXPECT_CALL(visitor, - OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _)); - track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true); -} - -} // namespace -} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index e3ddb7b..634fe12 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -20,6 +20,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" +#include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/tools/moqt_mock_visitor.h" #include "quiche/web_transport/test_tools/mock_web_transport.h" @@ -247,11 +248,10 @@ } static bool SubgroupHasBeenReset(MoqtObjectListener* subscription, - Location sequence) { - sequence.object = 0; + DataStreamIndex index) { return static_cast<MoqtSession::PublishedSubscription*>(subscription) ->reset_subgroups() - .contains(sequence); + .contains(index); } };
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index f22ffda..26525da 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -25,6 +25,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/tools/moq_chat.h" #include "quiche/quic/moqt/tools/moqt_client.h" @@ -185,9 +186,9 @@ } void ChatClient::RemoteTrackVisitor::OnObjectFragment( - const FullTrackName& full_track_name, Location /*sequence*/, - MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/, - absl::string_view object, bool end_of_message) { + const FullTrackName& full_track_name, + const PublishedObjectMetadata& /*metadata*/, absl::string_view object, + bool end_of_message) { if (!end_of_message) { std::cerr << "Error: received partial message despite requesting " "buffering\n";
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index 93d4ae5..5bad8e6 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -17,6 +17,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/tools/moqt_client.h" @@ -98,9 +99,7 @@ void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment(const moqt::FullTrackName& full_track_name, - Location sequence, - moqt::MoqtPriority publisher_priority, - moqt::MoqtObjectStatus status, + const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) override;
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index c3d9b74..268c912 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -18,6 +18,7 @@ #include "quiche/quic/moqt/moqt_live_relay_queue.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/tools/moq_chat.h" #include "quiche/quic/moqt/tools/moqt_server.h" @@ -149,9 +150,9 @@ } void ChatServer::RemoteTrackVisitor::OnObjectFragment( - const moqt::FullTrackName& full_track_name, moqt::Location sequence, - moqt::MoqtPriority /*publisher_priority*/, moqt::MoqtObjectStatus status, - absl::string_view object, bool end_of_message) { + const moqt::FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, absl::string_view object, + bool end_of_message) { if (!end_of_message) { std::cerr << "Error: received partial message despite requesting " "buffering\n"; @@ -162,14 +163,14 @@ << full_track_name.ToString() << "\n"; return; } - if (status != MoqtObjectStatus::kNormal) { - it->second->AddObject(sequence, status); + if (metadata.status != MoqtObjectStatus::kNormal) { + it->second->AddObject(metadata, "", /*fin=*/false); return; } if (!server_->WriteToFile(GetUsername(full_track_name), object)) { std::cout << GetUsername(full_track_name) << ": " << object << "\n\n"; } - it->second->AddObject(sequence, object); + it->second->AddObject(metadata, object, /*fin=*/false); } ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index 90e83e9..02fd059 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -43,9 +43,7 @@ std::optional<absl::string_view> reason_phrase) override; void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment(const moqt::FullTrackName& full_track_name, - Location sequence, - moqt::MoqtPriority /*publisher_priority*/, - moqt::MoqtObjectStatus /*status*/, + const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message) override; void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index acc64a0..7e70c29 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -30,6 +30,7 @@ #include "absl/time/time.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/tools/moqt_server.h" @@ -180,13 +181,12 @@ void OnCanAckObjects(MoqtObjectAckFunction) override {} void OnObjectFragment(const FullTrackName& full_track_name, - Location sequence, - MoqtPriority /*publisher_priority*/, - MoqtObjectStatus /*status*/, absl::string_view object, + const PublishedObjectMetadata& metadata, + absl::string_view object, bool /*end_of_message*/) override { std::string file_name = - absl::StrCat(sequence.group, "-", sequence.object, ".", - full_track_name.track_namespace().tuple().back()); + absl::StrCat(metadata.location.group, "-", metadata.location.object, + ".", full_track_name.track_namespace().tuple().back()); std::string file_path = quiche::JoinPath(directory_, file_name); std::ofstream output(file_path, std::ios::binary | std::ios::ate); output.write(object.data(), object.size());
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index a5d4afb..b88fd56 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -10,7 +10,6 @@ #include <optional> #include <utility> #include <variant> -#include <vector> #include "absl/status/status.h" #include "absl/status/statusor.h" @@ -65,7 +64,7 @@ const FullTrackName& GetTrackName() const override { return track_name_; } MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject, - (Location sequence), (const, override)); + (uint64_t, uint64_t, uint64_t), (const, override)); MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener), (override)); MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener), @@ -95,8 +94,8 @@ MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function), (override)); MOCK_METHOD(void, OnObjectFragment, - (const FullTrackName& full_track_name, Location sequence, - MoqtPriority publisher_priority, MoqtObjectStatus status, + (const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, absl::string_view object, bool end_of_message), (override)); MOCK_METHOD(void, OnSubscribeDone, (FullTrackName full_track_name),
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index 3da0a9d..3bf12ee 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -37,6 +37,7 @@ #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" #include "quiche/quic/moqt/moqt_priority.h" +#include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" @@ -299,12 +300,12 @@ object_ack_function_ = std::move(ack_function); } - void OnObjectFragment(const FullTrackName& full_track_name, Location sequence, - MoqtPriority /*publisher_priority*/, - MoqtObjectStatus status, absl::string_view object, + void OnObjectFragment(const FullTrackName& full_track_name, + const PublishedObjectMetadata& metadata, + absl::string_view object, bool end_of_message) override { QUICHE_DCHECK(full_track_name == TrackName()); - if (status != MoqtObjectStatus::kNormal) { + if (metadata.status != MoqtObjectStatus::kNormal) { QUICHE_DCHECK(end_of_message); return; } @@ -312,7 +313,7 @@ QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled"; return; } - OnFullObject(sequence, object); + OnFullObject(metadata.location, object); } void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}