Multiple MoQT refactors, merged into one CL since detangling them would require extra effort.
* Location no longer has a subgroup ID in it.
* A new type, DataStreamIndex, is introduced as a hashable (group, subgroup) tuple.
* A new type, PublishedObjectMetadata, is introduced to pass around object metadata in a way that's consistent between the send and the receive code.
* A bunch of other smaller changes.
This potentially fixes at least two bugs we currently have in our code:
- If a single subgroup were reset within a group, no other subgroup in the group would be able to send objects.
- Subgroup IDs were not passed to the application on receipt.
PiperOrigin-RevId: 776288849
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 0bbd9b2..43911dd 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1557,7 +1557,6 @@
"quic/moqt/moqt_session_interface.h",
"quic/moqt/moqt_subscribe_windows.h",
"quic/moqt/moqt_track.h",
- "quic/moqt/test_tools/mock_moqt_session.h",
"quic/moqt/test_tools/moqt_framer_utils.h",
"quic/moqt/test_tools/moqt_parser_test_visitor.h",
"quic/moqt/test_tools/moqt_session_peer.h",
@@ -1597,8 +1596,6 @@
"quic/moqt/moqt_subscribe_windows_test.cc",
"quic/moqt/moqt_track.cc",
"quic/moqt/moqt_track_test.cc",
- "quic/moqt/test_tools/mock_moqt_session.cc",
- "quic/moqt/test_tools/mock_moqt_session_test.cc",
"quic/moqt/test_tools/moqt_framer_utils.cc",
"quic/moqt/test_tools/moqt_simulator_harness.cc",
"quic/moqt/tools/chat_client.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 54bbd26..b2b99f0 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1561,7 +1561,6 @@
"src/quiche/quic/moqt/moqt_session_interface.h",
"src/quiche/quic/moqt/moqt_subscribe_windows.h",
"src/quiche/quic/moqt/moqt_track.h",
- "src/quiche/quic/moqt/test_tools/mock_moqt_session.h",
"src/quiche/quic/moqt/test_tools/moqt_framer_utils.h",
"src/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h",
"src/quiche/quic/moqt/test_tools/moqt_session_peer.h",
@@ -1601,8 +1600,6 @@
"src/quiche/quic/moqt/moqt_subscribe_windows_test.cc",
"src/quiche/quic/moqt/moqt_track.cc",
"src/quiche/quic/moqt/moqt_track_test.cc",
- "src/quiche/quic/moqt/test_tools/mock_moqt_session.cc",
- "src/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc",
"src/quiche/quic/moqt/test_tools/moqt_framer_utils.cc",
"src/quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
"src/quiche/quic/moqt/tools/chat_client.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 9830675..501ddf7 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1560,7 +1560,6 @@
"quiche/quic/moqt/moqt_session_interface.h",
"quiche/quic/moqt/moqt_subscribe_windows.h",
"quiche/quic/moqt/moqt_track.h",
- "quiche/quic/moqt/test_tools/mock_moqt_session.h",
"quiche/quic/moqt/test_tools/moqt_framer_utils.h",
"quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h",
"quiche/quic/moqt/test_tools/moqt_session_peer.h",
@@ -1600,8 +1599,6 @@
"quiche/quic/moqt/moqt_subscribe_windows_test.cc",
"quiche/quic/moqt/moqt_track.cc",
"quiche/quic/moqt/moqt_track_test.cc",
- "quiche/quic/moqt/test_tools/mock_moqt_session.cc",
- "quiche/quic/moqt/test_tools/mock_moqt_session_test.cc",
"quiche/quic/moqt/test_tools/moqt_framer_utils.cc",
"quiche/quic/moqt/test_tools/moqt_simulator_harness.cc",
"quiche/quic/moqt/tools/chat_client.cc",
diff --git a/quiche/quic/moqt/moqt_cached_object.cc b/quiche/quic/moqt/moqt_cached_object.cc
index 33294bb..df71f1c 100644
--- a/quiche/quic/moqt/moqt_cached_object.cc
+++ b/quiche/quic/moqt/moqt_cached_object.cc
@@ -13,15 +13,12 @@
moqt::PublishedObject CachedObjectToPublishedObject(
const CachedObject& object) {
PublishedObject result;
- result.sequence = object.sequence;
- result.status = object.status;
- result.publisher_priority = object.publisher_priority;
+ result.metadata = object.metadata;
if (object.payload != nullptr && !object.payload->empty()) {
result.payload = quiche::QuicheMemSlice(
object.payload->data(), object.payload->length(),
[retained_pointer = object.payload](absl::string_view) {});
}
- result.arrival_time = object.arrival_time;
result.fin_after_this = object.fin_after_this;
return result;
}
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h
index ac33096..ef5566a 100644
--- a/quiche/quic/moqt/moqt_cached_object.h
+++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -7,9 +7,6 @@
#include <memory>
-#include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/quiche_mem_slice.h"
@@ -18,11 +15,8 @@
// CachedObject is a version of PublishedObject with a reference counted
// payload.
struct CachedObject {
- Location sequence;
- MoqtObjectStatus status;
- MoqtPriority publisher_priority;
+ PublishedObjectMetadata metadata;
std::shared_ptr<quiche::QuicheMemSlice> payload;
- quic::QuicTime arrival_time;
bool fin_after_this; // This is the last object before FIN.
};
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index c1099ee..e01317e 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -19,6 +19,7 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -93,6 +94,12 @@
std::unique_ptr<MoqtServerEndpoint> server_;
};
+MATCHER_P2(
+ MetadataLocationAndStatus, location, status,
+ "Matches a PublishedObjectMetadata against Location and ObjectStatus") {
+ return arg.location == location && status == arg.status;
+}
+
TEST_F(MoqtIntegrationTest, Handshake) {
CreateDefaultEndpoints();
WireUpEndpoints();
@@ -283,15 +290,14 @@
queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
bool received_object = false;
- EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
- MoqtPriority /*publisher_priority*/,
- MoqtObjectStatus status, absl::string_view object,
- bool end_of_message) {
+ EXPECT_CALL(server_visitor, OnObjectFragment)
+ .WillOnce([&](const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view object, bool end_of_message) {
EXPECT_EQ(full_track_name, FullTrackName("test", "data"));
- EXPECT_EQ(sequence.group, 0u);
- EXPECT_EQ(sequence.object, 0u);
- EXPECT_EQ(status, MoqtObjectStatus::kNormal);
+ EXPECT_EQ(metadata.location.group, 0u);
+ EXPECT_EQ(metadata.location.object, 0u);
+ EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal);
EXPECT_EQ(object, "object data");
EXPECT_TRUE(end_of_message);
received_object = true;
@@ -336,18 +342,25 @@
EXPECT_TRUE(success);
int received = 0;
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{0, 3}, _,
- MoqtObjectStatus::kEndOfGroup, "", true))
+ EXPECT_CALL(
+ client_visitor,
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{0, 3}, MoqtObjectStatus::kEndOfGroup),
+ "", true))
.WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{1, 0}, _,
- MoqtObjectStatus::kNormal, "object 4", true))
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{1, 0}, MoqtObjectStatus::kNormal),
+ "object 4", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{1, 1}, _,
- MoqtObjectStatus::kNormal, "object 5", true))
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{1, 1}, MoqtObjectStatus::kNormal),
+ "object 5", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
@@ -356,22 +369,31 @@
EXPECT_TRUE(success);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{1, 2}, _,
- MoqtObjectStatus::kNormal, "object 6", true))
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{1, 2}, MoqtObjectStatus::kNormal),
+ "object 6", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{1, 3}, _,
- MoqtObjectStatus::kEndOfGroup, "", true))
+ EXPECT_CALL(
+ client_visitor,
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{1, 3}, MoqtObjectStatus::kEndOfGroup),
+ "", true))
.WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{2, 0}, _,
- MoqtObjectStatus::kNormal, "object 7", true))
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{2, 0}, MoqtObjectStatus::kNormal),
+ "object 7", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{2, 1}, _,
- MoqtObjectStatus::kNormal, "object 8", true))
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{2, 1}, MoqtObjectStatus::kNormal),
+ "object 8", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
@@ -379,13 +401,19 @@
[&]() { return received >= 7; });
EXPECT_TRUE(success);
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{2, 2}, _,
- MoqtObjectStatus::kEndOfGroup, "", true))
+ EXPECT_CALL(
+ client_visitor,
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{2, 2}, MoqtObjectStatus::kEndOfGroup),
+ "", true))
.WillOnce([&] { ++received; });
- EXPECT_CALL(client_visitor,
- OnObjectFragment(_, Location{3, 0}, _,
- MoqtObjectStatus::kEndOfTrack, "", true))
+ EXPECT_CALL(
+ client_visitor,
+ OnObjectFragment(_,
+ MetadataLocationAndStatus(
+ Location{3, 0}, MoqtObjectStatus::kEndOfTrack),
+ "", true))
.WillOnce([&] { ++received; });
queue->Close();
success = test_harness_.RunUntilWithDefaultTimeout(
@@ -428,13 +456,13 @@
break;
}
EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
- EXPECT_EQ(object.sequence, expected);
- if (object.sequence.object == 1) {
- EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup);
+ EXPECT_EQ(object.metadata.location, expected);
+ if (object.metadata.location.object == 1) {
+ EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup);
expected.object = 0;
++expected.group;
} else {
- EXPECT_EQ(object.status, MoqtObjectStatus::kNormal);
+ EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal);
EXPECT_EQ(object.payload.AsStringView(), "object");
++expected.object;
}
@@ -573,9 +601,9 @@
SubscribeLatestObject(full_track_name, &client_visitor);
// Deliver 3 objects on 2 streams.
- queue->AddObject(Location(0, 0), "object,0,0", false);
- queue->AddObject(Location(0, 1), "object,0,1", true);
- queue->AddObject(Location(1, 0), "object,1,0", true);
+ queue->AddObject(Location(0, 0), 0, "object,0,0", false);
+ queue->AddObject(Location(0, 1), 0, "object,0,1", true);
+ queue->AddObject(Location(1, 0), 0, "object,1,0", true);
int received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment).WillRepeatedly([&]() {
++received;
@@ -686,18 +714,17 @@
size_t bytes_received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment)
.WillRepeatedly(
- [&](const FullTrackName&, Location sequence,
- MoqtPriority /*publisher_priority*/, MoqtObjectStatus status,
+ [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
absl::string_view object,
bool end_of_message) { bytes_received += object.size(); });
- queue->AddObject(Location{0, 0, 0}, data, false);
- queue->AddObject(Location{0, 0, 1}, data, false);
- queue->AddObject(Location{0, 0, 2}, data, false);
- queue->AddObject(Location{0, 0, 3}, data, true);
+ queue->AddObject(Location{0, 0}, 0, data, false);
+ queue->AddObject(Location{0, 1}, 0, data, false);
+ queue->AddObject(Location{0, 2}, 0, data, false);
+ queue->AddObject(Location{0, 3}, 0, data, true);
success = test_harness_.RunUntilWithDefaultTimeout([&]() {
return MoqtSessionPeer::SubgroupHasBeenReset(
MoqtSessionPeer::GetSubscription(server_->session(), 0),
- Location{0, 0, 0});
+ DataStreamIndex{0, 0});
});
EXPECT_TRUE(success);
// Stream was reset before all the bytes arrived.
@@ -736,16 +763,15 @@
size_t bytes_received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment)
.WillRepeatedly(
- [&](const FullTrackName&, Location sequence,
- MoqtPriority /*publisher_priority*/, MoqtObjectStatus status,
+ [&](const FullTrackName&, const PublishedObjectMetadata& metadata,
absl::string_view object,
bool end_of_message) { bytes_received += object.size(); });
- queue->AddObject(Location{0, 0, 0}, data, false);
- queue->AddObject(Location{1, 0, 0}, data, false);
+ queue->AddObject(Location{0, 0}, 0, data, false);
+ queue->AddObject(Location{1, 0}, 0, data, false);
success = test_harness_.RunUntilWithDefaultTimeout([&]() {
return MoqtSessionPeer::SubgroupHasBeenReset(
MoqtSessionPeer::GetSubscription(server_->session(), 0),
- Location{0, 0, 0});
+ DataStreamIndex{0, 0});
});
EXPECT_TRUE(success);
EXPECT_EQ(bytes_received, 2000);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index e6b6f9b..929c531 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
+#include <cstdint>
#include <memory>
#include <optional>
#include <vector>
@@ -18,13 +19,14 @@
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
+#include "quiche/common/quiche_callbacks.h"
#include "quiche/common/quiche_mem_slice.h"
#include "quiche/common/simple_buffer_allocator.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
-bool MoqtLiveRelayQueue::AddFin(Location sequence) {
+bool MoqtLiveRelayQueue::AddFin(Location sequence, uint64_t subgroup) {
switch (forwarding_preference_) {
case MoqtForwardingPreference::kDatagram:
return false;
@@ -37,8 +39,8 @@
return false;
}
Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(
- SubgroupPriority{publisher_priority_, sequence.subgroup});
+ auto subgroup_it =
+ group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup});
if (subgroup_it == group.subgroups.end()) {
// Subgroup does not exist.
return false;
@@ -53,13 +55,14 @@
}
subgroup_it->second.rbegin()->second.fin_after_this = true;
for (MoqtObjectListener* listener : listeners_) {
- listener->OnNewFinAvailable(sequence);
+ listener->OnNewFinAvailable(sequence, subgroup);
}
return true;
}
bool MoqtLiveRelayQueue::OnStreamReset(
- Location sequence, webtransport::StreamErrorCode error_code) {
+ Location sequence, uint64_t subgroup_id,
+ webtransport::StreamErrorCode error_code) {
switch (forwarding_preference_) {
case MoqtForwardingPreference::kDatagram:
return false;
@@ -72,23 +75,21 @@
return false;
}
Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(
- SubgroupPriority{publisher_priority_, sequence.subgroup});
+ auto subgroup_it =
+ group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup_id});
if (subgroup_it == group.subgroups.end()) {
// Subgroup does not exist.
return false;
}
for (MoqtObjectListener* listener : listeners_) {
- listener->OnSubgroupAbandoned(sequence, error_code);
+ listener->OnSubgroupAbandoned(sequence.group, subgroup_id, error_code);
}
return true;
}
-// TODO(martinduke): Unless Track Forwarding preference goes away, support it.
-bool MoqtLiveRelayQueue::AddRawObject(Location sequence,
- MoqtObjectStatus status,
- MoqtPriority priority,
- absl::string_view payload, bool fin) {
+bool MoqtLiveRelayQueue::AddObject(const PublishedObjectMetadata& metadata,
+ absl::string_view payload, bool fin) {
+ const Location& sequence = metadata.location;
bool last_object_in_stream = fin;
if (queue_.size() == kMaxQueuedGroups) {
if (queue_.begin()->first > sequence.group) {
@@ -110,7 +111,7 @@
<< "track";
return false;
}
- switch (status) {
+ switch (metadata.status) {
case MoqtObjectStatus::kEndOfTrackAndGroup:
if (sequence < next_sequence_) {
QUICHE_DLOG(INFO) << "EndOfTrackAndGroup is too early.";
@@ -145,16 +146,17 @@
<< "group";
return false;
}
- if ((status == MoqtObjectStatus::kEndOfGroup ||
- status == MoqtObjectStatus::kEndOfTrackAndGroup) &&
+ if ((metadata.status == MoqtObjectStatus::kEndOfGroup ||
+ metadata.status == MoqtObjectStatus::kEndOfTrackAndGroup) &&
sequence.object < group.next_object) {
QUICHE_DLOG(INFO) << "Skipping EndOfGroup because it is not the last "
<< "object in the group.";
return false;
}
}
+ // TODO: use `metadata.publisher_priority` instead.
auto subgroup_it = group.subgroups.try_emplace(
- SubgroupPriority{priority, sequence.subgroup});
+ SubgroupPriority{publisher_priority_, metadata.subgroup.value_or(0)});
auto& subgroup = subgroup_it.first->second;
if (!subgroup.empty()) { // Check if the new object is valid
CachedObject& last_object = subgroup.rbegin()->second;
@@ -165,10 +167,11 @@
}
// If last_object has stream-ending status, it should have been caught by
// the fin_after_this check above.
- QUICHE_DCHECK(last_object.status != MoqtObjectStatus::kEndOfTrackAndGroup &&
- last_object.status != MoqtObjectStatus::kEndOfGroup &&
- last_object.status != MoqtObjectStatus::kEndOfTrack);
- if (last_object.sequence.object >= sequence.object) {
+ QUICHE_DCHECK(
+ last_object.metadata.status != MoqtObjectStatus::kEndOfTrackAndGroup &&
+ last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
+ last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
+ if (last_object.metadata.location.object >= sequence.object) {
QUICHE_DLOG(INFO) << "Skipping object because it does not increase the "
<< "object ID monotonically in the subgroup.";
return false;
@@ -182,7 +185,7 @@
group.next_object = sequence.object + 1;
}
// Anticipate stream FIN with most non-normal objects.
- switch (status) {
+ switch (metadata.status) {
case MoqtObjectStatus::kEndOfTrack:
case MoqtObjectStatus::kEndOfTrackAndGroup:
end_of_track_ = sequence;
@@ -201,26 +204,24 @@
? nullptr
: std::make_shared<quiche::QuicheMemSlice>(quiche::QuicheBuffer::Copy(
quiche::SimpleBufferAllocator::Get(), payload));
- subgroup.emplace(
- sequence.object,
- CachedObject{sequence, status, priority, slice, clock_->ApproximateNow(),
- last_object_in_stream});
+ subgroup.emplace(sequence.object,
+ CachedObject{metadata, slice, last_object_in_stream});
for (MoqtObjectListener* listener : listeners_) {
- listener->OnNewObjectAvailable(sequence);
+ listener->OnNewObjectAvailable(sequence, metadata.subgroup.value_or(0));
}
return true;
}
std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject(
- Location sequence) const {
- auto group_it = queue_.find(sequence.group);
+ uint64_t group_id, uint64_t subgroup_id, uint64_t object_id) const {
+ auto group_it = queue_.find(group_id);
if (group_it == queue_.end()) {
// Group does not exist.
return std::nullopt;
}
const Group& group = group_it->second;
- auto subgroup_it = group.subgroups.find(
- SubgroupPriority{publisher_priority_, sequence.subgroup});
+ auto subgroup_it =
+ group.subgroups.find(SubgroupPriority{publisher_priority_, subgroup_id});
if (subgroup_it == group.subgroups.end()) {
// Subgroup does not exist.
return std::nullopt;
@@ -230,7 +231,7 @@
return std::nullopt; // There are no objects.
}
// Find an object with ID of at least sequence.object.
- auto object_it = subgroup.lower_bound(sequence.object);
+ auto object_it = subgroup.lower_bound(object_id);
if (object_it == subgroup.end()) {
// No object after the last one received.
return std::nullopt;
@@ -238,30 +239,15 @@
return CachedObjectToPublishedObject(object_it->second);
}
-std::vector<Location> MoqtLiveRelayQueue::GetCachedObjectsInRange(
- Location start, Location end) const {
- std::vector<Location> sequences;
- SubscribeWindow window(start, end.group, end.object);
+void MoqtLiveRelayQueue::ForAllObjects(
+ quiche::UnretainedCallback<void(const CachedObject&)> callback) {
for (auto& group_it : queue_) {
- if (group_it.first < start.group) {
- continue;
- }
- if (group_it.first > end.group) {
- return sequences;
- }
for (auto& subgroup_it : group_it.second.subgroups) {
for (auto& object_it : subgroup_it.second) {
- if (window.InWindow(object_it.second.sequence)) {
- sequences.push_back(object_it.second.sequence);
- }
- if (group_it.first == end.group &&
- object_it.second.sequence.object >= end.object) {
- break;
- }
+ callback(object_it.second);
}
}
}
- return sequences;
}
absl::StatusOr<MoqtTrackStatusCode> MoqtLiveRelayQueue::GetTrackStatus() const {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 3ac41dd..60ef828 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -10,7 +10,6 @@
#include <memory>
#include <optional>
#include <utility>
-#include <vector>
#include "absl/container/btree_map.h"
#include "absl/container/flat_hash_set.h"
@@ -24,6 +23,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/common/quiche_callbacks.h"
#include "quiche/web_transport/web_transport.h"
namespace moqt {
@@ -58,30 +58,48 @@
// occur. A false return value might result in a session error on the
// inbound session, but this queue is the only place that retains enough state
// to check.
- bool AddObject(Location sequence, MoqtObjectStatus status, bool fin = false) {
- return AddRawObject(sequence, status, publisher_priority_, "", fin);
- }
- bool AddObject(Location sequence, absl::string_view object,
+ bool AddObject(const PublishedObjectMetadata& metadata,
+ absl::string_view payload, bool fin);
+
+ // Convenience methods primarily for use in tests. Prefer the
+ // `PublishedObjectMetadata` version in real forwarding code to ensure all
+ // metadata is copied correctly.
+ bool AddObject(Location location, uint64_t subgroup, MoqtObjectStatus status,
bool fin = false) {
- return AddRawObject(sequence, MoqtObjectStatus::kNormal,
- publisher_priority_, object, fin);
+ PublishedObjectMetadata metadata;
+ metadata.location = location;
+ metadata.subgroup = subgroup;
+ metadata.status = status;
+ metadata.publisher_priority = 0;
+ return AddObject(metadata, "", fin);
}
+ bool AddObject(Location location, uint64_t subgroup, absl::string_view object,
+ bool fin = false) {
+ PublishedObjectMetadata metadata;
+ metadata.location = location;
+ metadata.subgroup = subgroup;
+ metadata.status = MoqtObjectStatus::kNormal;
+ metadata.publisher_priority = 0;
+ return AddObject(metadata, object, fin);
+ }
+
// Record a received FIN that did not come with the last object.
// If the forwarding preference is kDatagram or kTrack, |sequence| is ignored.
// Otherwise, |sequence| is used to determine which stream is being FINed. If
// the object ID does not match the last object ID in the stream, no action
// is taken.
- bool AddFin(Location sequence);
+ bool AddFin(Location sequence, uint64_t subgroup_id);
// Record a received RESET_STREAM. |sequence| encodes the group and subgroup
// of the stream that is being reset. Returns false on datagram tracks, or if
// the stream does not exist.
- bool OnStreamReset(Location sequence,
+ bool OnStreamReset(Location sequence, uint64_t subgroup_id,
webtransport::StreamErrorCode error_code);
// MoqtTrackPublisher implementation.
const FullTrackName& GetTrackName() const override { return track_; }
std::optional<PublishedObject> GetCachedObject(
- Location sequence) const override;
+ uint64_t group_id, uint64_t subgroup_id,
+ uint64_t min_object) const override;
void AddObjectListener(MoqtObjectListener* listener) override {
listeners_.insert(listener);
listener->OnSubscribeAccepted();
@@ -119,8 +137,8 @@
}
}
- std::vector<Location> GetCachedObjectsInRange(Location start,
- Location end) const;
+ void ForAllObjects(
+ quiche::UnretainedCallback<void(const CachedObject&)> callback);
private:
// The number of recent groups to keep around for newly joined subscribers.
@@ -135,9 +153,6 @@
absl::btree_map<SubgroupPriority, Subgroup> subgroups;
};
- bool AddRawObject(Location sequence, MoqtObjectStatus status,
- MoqtPriority priority, absl::string_view payload, bool fin);
-
const quic::QuicClock* clock_;
FullTrackName track_;
MoqtForwardingPreference forwarding_preference_;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index 6115acb..f40fe10 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -6,9 +6,9 @@
#include <cstdint>
#include <optional>
-#include <vector>
#include "absl/strings/string_view.h"
+#include "quiche/quic/moqt/moqt_cached_object.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
@@ -29,54 +29,58 @@
AddObjectListener(this);
}
- void OnNewObjectAvailable(Location sequence) {
- std::optional<PublishedObject> object = GetCachedObject(sequence);
+ void OnNewObjectAvailable(Location sequence, uint64_t subgroup_id) {
+ std::optional<PublishedObject> object =
+ GetCachedObject(sequence.group, subgroup_id, sequence.object);
QUICHE_CHECK(object.has_value());
if (!object.has_value()) {
return;
}
- switch (object->status) {
+ switch (object->metadata.status) {
case MoqtObjectStatus::kNormal:
- PublishObject(object->sequence.group, object->sequence.object,
+ PublishObject(object->metadata.location.group,
+ object->metadata.location.object,
object->payload.AsStringView());
break;
case MoqtObjectStatus::kObjectDoesNotExist:
- SkipObject(object->sequence.group, object->sequence.object);
+ SkipObject(object->metadata.location.group,
+ object->metadata.location.object);
break;
case MoqtObjectStatus::kGroupDoesNotExist:
- SkipGroup(object->sequence.group);
+ SkipGroup(object->metadata.location.group);
break;
case MoqtObjectStatus::kEndOfGroup:
- CloseStreamForGroup(object->sequence.group);
+ CloseStreamForGroup(object->metadata.location.group);
break;
case MoqtObjectStatus::kEndOfTrack:
CloseTrack();
break;
case MoqtObjectStatus::kEndOfTrackAndGroup:
- CloseStreamForGroup(object->sequence.group);
+ CloseStreamForGroup(object->metadata.location.group);
CloseTrack();
break;
default:
EXPECT_TRUE(false);
}
if (object->fin_after_this) {
- CloseStreamForSubgroup(object->sequence.group, object->sequence.subgroup);
+ CloseStreamForSubgroup(object->metadata.location.group,
+ object->metadata.subgroup.value_or(0));
}
}
void GetObjectsFromPast(const SubscribeWindow& window) {
- std::vector<Location> objects =
- GetCachedObjectsInRange(Location(0, 0), GetLargestLocation());
- for (Location object : objects) {
- if (window.InWindow(object)) {
- OnNewObjectAvailable(object);
+ ForAllObjects([&](const CachedObject& object) {
+ if (window.InWindow(object.metadata.location)) {
+ OnNewObjectAvailable(object.metadata.location,
+ object.metadata.subgroup.value_or(0));
}
- }
+ });
}
- MOCK_METHOD(void, OnNewFinAvailable, (Location sequence));
+ MOCK_METHOD(void, OnNewFinAvailable, (Location sequence, uint64_t subgroup));
MOCK_METHOD(void, OnSubgroupAbandoned,
- (Location sequence, webtransport::StreamErrorCode error_code));
+ (uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code));
MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id));
MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
MOCK_METHOD(void, CloseStreamForSubgroup,
@@ -106,10 +110,11 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseStreamForGroup(0));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
}
TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) {
@@ -124,9 +129,9 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -141,9 +146,9 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
@@ -159,13 +164,14 @@
EXPECT_CALL(queue, PublishObject(1, 1, "e"));
EXPECT_CALL(queue, PublishObject(1, 2, "f"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f"));
}
TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) {
@@ -187,13 +193,14 @@
EXPECT_CALL(queue, PublishObject(1, 1, "e"));
EXPECT_CALL(queue, PublishObject(1, 2, "f"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, 0, "f"));
queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
@@ -219,20 +226,24 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
- EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
- EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
}
TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) {
@@ -266,20 +277,24 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
- EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
- EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -303,21 +318,25 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
- EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
- EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, 0, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, 0, "d"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{1, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, 0, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, 0, "f"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{2, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, 0, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, 0, "h"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{3, 2}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, 0, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, 0, "j"));
// This object will be ignored, but this is not an error.
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 2}, 0, MoqtObjectStatus::kEndOfGroup));
}
TEST(MoqtLiveRelayQueue, EndOfTrackAndGroup) {
@@ -328,12 +347,12 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseTrack());
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_FALSE(
- queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfTrackAndGroup));
- EXPECT_TRUE(
- queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrackAndGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_FALSE(queue.AddObject(Location{0, 1}, 0,
+ MoqtObjectStatus::kEndOfTrackAndGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, 0,
+ MoqtObjectStatus::kEndOfTrackAndGroup));
}
TEST(MoqtLiveRelayQueue, EndOfTrack) {
@@ -344,10 +363,12 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseTrack());
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_FALSE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrack));
- EXPECT_TRUE(queue.AddObject(Location{1, 0}, MoqtObjectStatus::kEndOfTrack));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_FALSE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfTrack));
+ EXPECT_TRUE(
+ queue.AddObject(Location{1, 0}, 0, MoqtObjectStatus::kEndOfTrack));
}
TEST(MoqtLiveRelayQueue, EndOfGroup) {
@@ -358,11 +379,13 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseStreamForGroup(0));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_FALSE(queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(Location{0, 4}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_FALSE(
+ queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_FALSE(queue.AddObject(Location{0, 4}, 0, "e"));
}
TEST(MoqtLiveRelayQueue, GroupDoesNotExist) {
@@ -372,9 +395,9 @@
EXPECT_CALL(queue, SkipGroup(0));
}
EXPECT_FALSE(
- queue.AddObject(Location{0, 1}, MoqtObjectStatus::kGroupDoesNotExist));
+ queue.AddObject(Location{0, 1}, 0, MoqtObjectStatus::kGroupDoesNotExist));
EXPECT_TRUE(
- queue.AddObject(Location{0, 0}, MoqtObjectStatus::kGroupDoesNotExist));
+ queue.AddObject(Location{0, 0}, 0, MoqtObjectStatus::kGroupDoesNotExist));
}
TEST(MoqtLiveRelayQueue, OverwriteObject) {
@@ -385,11 +408,12 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
- EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(Location{0, 1}, "invalid"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 0, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 0, "c"));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, 0, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_FALSE(queue.AddObject(Location{0, 1}, 0, "invalid"));
}
TEST(MoqtLiveRelayQueue, DifferentSubgroups) {
@@ -400,11 +424,11 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 3, "d"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 3}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 3}, 0));
EXPECT_CALL(queue, PublishObject(0, 5, "e"));
EXPECT_CALL(queue, PublishObject(0, 7, "f"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 1, 5}));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 2, 7}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 5}, 1));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 7}, 2));
// Serve them back in strict subgroup order.
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
@@ -417,15 +441,15 @@
EXPECT_CALL(queue, PublishObject(0, 7, "f"));
EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(Location{0, 1, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 3}, "d"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2, 2}, "c"));
- EXPECT_TRUE(queue.AddFin(Location{0, 0, 3}));
- EXPECT_TRUE(queue.AddObject(Location{0, 1, 5}, "e"));
- EXPECT_TRUE(queue.AddObject(Location{0, 2, 7}, "f"));
- EXPECT_TRUE(queue.AddFin(Location{0, 1, 5}));
- EXPECT_TRUE(queue.AddFin(Location{0, 2, 7}));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, 1, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, 0, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, 2, "c"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 3}, 0));
+ EXPECT_TRUE(queue.AddObject(Location{0, 5}, 1, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 7}, 2, "f"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 5}, 1));
+ EXPECT_TRUE(queue.AddFin(Location{0, 7}, 2));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -434,12 +458,12 @@
{
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 0}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0));
EXPECT_CALL(queue, PublishObject(0, 2, "b")).Times(0);
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.AddFin(Location{0, 0, 0}));
- EXPECT_FALSE(queue.AddObject(Location{0, 0, 2}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0));
+ EXPECT_FALSE(queue.AddObject(Location{0, 2}, 0, "b"));
}
TEST(MoqtLiveRelayQueue, AddObjectWithFin) {
@@ -448,10 +472,10 @@
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", true));
- std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", true));
+ std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0);
ASSERT_TRUE(object.has_value());
- EXPECT_EQ(object->status, MoqtObjectStatus::kNormal);
+ EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal);
EXPECT_TRUE(object->fin_after_this);
}
@@ -461,12 +485,12 @@
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", false));
- EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}));
- EXPECT_TRUE(queue.AddFin(Location{0, 0}));
- std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a", false));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}, 0));
+ EXPECT_TRUE(queue.AddFin(Location{0, 0}, 0));
+ std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0);
ASSERT_TRUE(object.has_value());
- EXPECT_EQ(object->status, MoqtObjectStatus::kNormal);
+ EXPECT_EQ(object->metadata.status, MoqtObjectStatus::kNormal);
EXPECT_TRUE(object->fin_after_this);
}
@@ -475,10 +499,10 @@
{
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnSubgroupAbandoned(Location{0, 0}, 0x1));
+ EXPECT_CALL(queue, OnSubgroupAbandoned(0, 0, 0x1));
}
- EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.OnStreamReset(Location{0, 0}, 0x1));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, 0, "a"));
+ EXPECT_TRUE(queue.OnStreamReset(Location{0, 0}, 0, 0x1));
}
} // namespace
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index d51237a..f5ec8c2 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -12,6 +12,7 @@
#include <initializer_list>
#include <optional>
#include <string>
+#include <tuple>
#include <utility>
#include <vector>
@@ -366,21 +367,21 @@
std::string name_ = "";
};
-// These are absolute sequence numbers.
+// Location as defined in
+// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
struct Location {
- uint64_t group;
- uint64_t subgroup;
- uint64_t object;
- Location() : Location(0, 0) {}
- // There is a lot of code from before subgroups. Assume there's one subgroup
- // with ID 0 per group.
- Location(uint64_t group, uint64_t object) : Location(group, 0, object) {}
- Location(uint64_t group, uint64_t subgroup, uint64_t object)
- : group(group), subgroup(subgroup), object(object) {}
+ uint64_t group = 0;
+ uint64_t object = 0;
+
+ Location() = default;
+ Location(uint64_t group, uint64_t object) : group(group), object(object) {}
+
bool operator==(const Location& other) const {
return group == other.group && object == other.object;
}
- // These are temporal ordering comparisons, so subgroup ID doesn't matter.
+
+ // Location order as described in
+ // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
bool operator<(const Location& other) const {
return group < other.group ||
(group == other.group && object < other.object);
@@ -390,13 +391,10 @@
(group == other.group && object <= other.object));
}
bool operator>(const Location& other) const { return !(*this <= other); }
- Location& operator=(Location other) {
- group = other.group;
- subgroup = other.subgroup;
- object = other.object;
- return *this;
- }
- Location next() const { return Location{group, subgroup, object + 1}; }
+ bool operator>=(const Location& other) const { return !(*this < other); }
+
+ Location next() const { return Location(group, object + 1); }
+
template <typename H>
friend H AbslHashValue(H h, const Location& m);
@@ -730,7 +728,7 @@
// and ranges. The session will populate them instead.
std::optional<JoiningFetch> joining_fetch;
FullTrackName full_track_name;
- Location start_object; // subgroup is ignored
+ Location start_object;
uint64_t end_group;
std::optional<uint64_t> end_object;
VersionSpecificParameters parameters;
@@ -743,7 +741,7 @@
struct QUICHE_EXPORT MoqtFetchOk {
uint64_t subscribe_id;
MoqtDeliveryOrder group_order;
- Location largest_id; // subgroup is ignored
+ Location largest_id;
VersionSpecificParameters parameters;
};
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 0428f7e..35221dd 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -65,33 +65,37 @@
Location sequence{current_group_id_, queue_.back().size()};
bool fin = forwarding_preference_ == MoqtForwardingPreference::kSubgroup &&
status == MoqtObjectStatus::kEndOfGroup;
- queue_.back().push_back(
- CachedObject{sequence, status, publisher_priority_,
- std::make_shared<quiche::QuicheMemSlice>(std::move(payload)),
- clock_->ApproximateNow(), fin});
+ queue_.back().push_back(CachedObject{
+ PublishedObjectMetadata{sequence, 0, status, publisher_priority_,
+ clock_->ApproximateNow()},
+ std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin});
for (MoqtObjectListener* listener : listeners_) {
- listener->OnNewObjectAvailable(sequence);
+ listener->OnNewObjectAvailable(sequence, /*subgroup=*/0);
}
}
std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
- Location sequence) const {
- if (sequence.group < first_group_in_queue()) {
- return PublishedObject{Location{sequence.group, sequence.object},
- MoqtObjectStatus::kGroupDoesNotExist,
- publisher_priority_, quiche::QuicheMemSlice(),
- clock_->ApproximateNow()};
+ uint64_t group, uint64_t subgroup, uint64_t object) const {
+ QUICHE_DCHECK_EQ(subgroup, 0u);
+ if (group < first_group_in_queue()) {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(group, object),
+ /*subgroup=*/0,
+ MoqtObjectStatus::kGroupDoesNotExist,
+ publisher_priority_, clock_->ApproximateNow()},
+ quiche::QuicheMemSlice{}};
}
- if (sequence.group > current_group_id_) {
+ if (group > current_group_id_) {
return std::nullopt;
}
- const std::vector<CachedObject>& group =
- queue_[sequence.group - first_group_in_queue()];
- if (sequence.object >= group.size()) {
+ const std::vector<CachedObject>& group_objects =
+ queue_[group - first_group_in_queue()];
+ if (object >= group_objects.size()) {
return std::nullopt;
}
- QUICHE_DCHECK(sequence == group[sequence.object].sequence);
- return CachedObjectToPublishedObject(group[sequence.object]);
+ QUICHE_DCHECK(Location(group, object) ==
+ group_objects[object].metadata.location);
+ return CachedObjectToPublishedObject(group_objects[object]);
}
std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange(
@@ -100,8 +104,8 @@
SubscribeWindow window(start, end.group, end.object);
for (const Group& group : queue_) {
for (const CachedObject& object : group) {
- if (window.InWindow(object.sequence)) {
- sequences.push_back(object.sequence);
+ if (window.InWindow(object.metadata.location)) {
+ sequences.push_back(object.metadata.location);
}
}
}
@@ -175,8 +179,8 @@
MoqtFetchTask::GetNextObjectResult result = GetNextObjectInner(object);
bool missing_object =
result == kSuccess &&
- (object.status == MoqtObjectStatus::kObjectDoesNotExist ||
- object.status == MoqtObjectStatus::kGroupDoesNotExist);
+ (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist ||
+ object.metadata.status == MoqtObjectStatus::kGroupDoesNotExist);
if (!missing_object) {
return result;
}
@@ -192,8 +196,8 @@
return kEof;
}
- std::optional<PublishedObject> result =
- queue_->GetCachedObject(objects_.front());
+ std::optional<PublishedObject> result = queue_->GetCachedObject(
+ objects_.front().group, 0, objects_.front().object);
if (!result.has_value()) {
status_ = absl::InternalError("Previously known object became unknown.");
return kError;
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index bcb7f5b..f9a1361 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -55,7 +55,7 @@
// MoqtTrackPublisher implementation.
const FullTrackName& GetTrackName() const override { return track_; }
std::optional<PublishedObject> GetCachedObject(
- Location sequence) const override;
+ uint64_t group, uint64_t subgroup, uint64_t min_object) const override;
void AddObjectListener(MoqtObjectListener* listener) override {
listeners_.insert(listener);
listener->OnSubscribeAccepted();
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index a648a42..46e0d19 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -46,16 +46,18 @@
AddObjectListener(this);
}
- void OnNewObjectAvailable(Location sequence) override {
- std::optional<PublishedObject> object = GetCachedObject(sequence);
+ void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override {
+ std::optional<PublishedObject> object =
+ GetCachedObject(sequence.group, subgroup, sequence.object);
QUICHE_CHECK(object.has_value());
- ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal,
- MoqtObjectStatus::kEndOfGroup));
- if (object->status == MoqtObjectStatus::kNormal) {
- PublishObject(object->sequence.group, object->sequence.object,
+ ASSERT_THAT(object->metadata.status, AnyOf(MoqtObjectStatus::kNormal,
+ MoqtObjectStatus::kEndOfGroup));
+ if (object->metadata.status == MoqtObjectStatus::kNormal) {
+ PublishObject(object->metadata.location.group,
+ object->metadata.location.object,
object->payload.AsStringView());
} else {
- CloseStreamForGroup(object->sequence.group);
+ CloseStreamForGroup(object->metadata.location.group);
}
}
@@ -64,14 +66,15 @@
GetCachedObjectsInRange(Location(0, 0), GetLargestLocation());
for (Location object : objects) {
if (window.InWindow(object)) {
- OnNewObjectAvailable(object);
+ OnNewObjectAvailable(object, 0);
}
}
}
- MOCK_METHOD(void, OnNewFinAvailable, (Location sequence));
+ MOCK_METHOD(void, OnNewFinAvailable, (Location sequence, uint64_t subgroup));
MOCK_METHOD(void, OnSubgroupAbandoned,
- (Location sequence, webtransport::StreamErrorCode error_code));
+ (uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code));
MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id));
MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
MOCK_METHOD(void, PublishObject,
@@ -94,10 +97,10 @@
MoqtFetchTask::GetNextObjectResult result = fetch->GetNextObject(object);
switch (result) {
case MoqtFetchTask::kSuccess:
- if (object.status == MoqtObjectStatus::kNormal) {
+ if (object.metadata.status == MoqtObjectStatus::kNormal) {
objects.emplace_back(object.payload.AsStringView());
} else {
- EXPECT_EQ(object.status, MoqtObjectStatus::kEndOfGroup);
+ EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup);
}
continue;
case MoqtFetchTask::kPending:
@@ -364,9 +367,9 @@
quic::QuicTime test_start = clock->ApproximateNow();
TestMoqtOutgoingQueue queue;
queue.AddObject(MemSliceFromString("a"), true);
- std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
+ std::optional<PublishedObject> object = queue.GetCachedObject(0, 0, 0);
ASSERT_TRUE(object.has_value());
- EXPECT_GE(object->arrival_time, test_start);
+ EXPECT_GE(object->metadata.arrival_time, test_start);
}
} // namespace
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 260beaa..21a9f77 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -9,7 +9,6 @@
#include <memory>
#include <optional>
#include <variant>
-#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@@ -22,14 +21,19 @@
namespace moqt {
+struct PublishedObjectMetadata {
+ Location location;
+ std::optional<uint64_t> subgroup; // nullopt for datagrams
+ MoqtObjectStatus status;
+ MoqtPriority publisher_priority;
+ quic::QuicTime arrival_time = quic::QuicTime::Zero();
+};
+
// PublishedObject is a description of an object that is sufficient to publish
// it on a given track.
struct PublishedObject {
- Location sequence;
- MoqtObjectStatus status;
- MoqtPriority publisher_priority;
+ PublishedObjectMetadata metadata;
quiche::QuicheMemSlice payload;
- quic::QuicTime arrival_time = quic::QuicTime::Zero();
bool fin_after_this = false;
};
@@ -49,18 +53,20 @@
MoqtSubscribeErrorReason reason,
std::optional<uint64_t> track_alias = std::nullopt) = 0;
- // Notifies that an object with the given sequence number has become
- // available. The object payload itself may be retrieved via GetCachedObject
- // method of the associated track publisher.
- virtual void OnNewObjectAvailable(Location sequence) = 0;
+ // Notifies that a new object is available on the track. The object payload
+ // itself may be retrieved via GetCachedObject method of the associated track
+ // publisher.
+ virtual void OnNewObjectAvailable(Location sequence, uint64_t subgroup) = 0;
// Notifies that a pure FIN has arrived following |sequence|. Should not be
// called unless all objects have already been delivered. If not delivered,
// instead set the fin_after_this flag in the PublishedObject.
- virtual void OnNewFinAvailable(Location sequence) = 0;
+ virtual void OnNewFinAvailable(Location final_object_in_subgroup,
+ uint64_t subgroup_id) = 0;
// Notifies that the a stream is being abandoned (via RESET_STREAM) before
// all objects are delivered.
virtual void OnSubgroupAbandoned(
- Location sequence, webtransport::StreamErrorCode error_code) = 0;
+ uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code) = 0;
// No further object will be published for the given group, usually due to a
// timeout. The owner of the Listener may want to reset the relevant streams.
@@ -130,7 +136,7 @@
// GetCachedObject lets the MoQT stack access the objects that are available
// in the track's built-in local cache. Retrieves the first object ID >=
- // sequence.object that matches (sequence.group, sequence.subgroup).
+ // min_object that matches (sequence.group, sequence.subgroup).
//
// This implementation of MoQT does not store any objects within the MoQT
// stack itself, at least until the object is fully serialized and passed to
@@ -145,13 +151,7 @@
// otherwise, the corresponding QUIC streams will be stuck waiting for objects
// that will never arrive.
virtual std::optional<PublishedObject> GetCachedObject(
- Location sequence) const = 0;
-
- // TODO: add an API to fetch past objects that are out of cache and might
- // require an upstream request to fill the relevant cache again. This is
- // currently done since the specification does not clearly describe how this
- // is supposed to be done, especially with respect to such things as
- // backpressure.
+ uint64_t group, uint64_t subgroup, uint64_t min_object) const = 0;
// Registers a listener with the track. The listener will be notified of all
// newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 715413b..2167463 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -230,10 +230,14 @@
SubscribeRemoteTrack::Visitor* visitor = track->visitor();
if (visitor != nullptr) {
// TODO(martinduke): Handle extension headers.
- visitor->OnObjectFragment(track->full_track_name(),
- Location{message.group_id, 0, message.object_id},
- message.publisher_priority, message.object_status,
- *payload, true);
+ PublishedObjectMetadata metadata;
+ metadata.location = Location(message.group_id, message.object_id);
+ metadata.subgroup = std::nullopt;
+ metadata.status = message.object_status;
+ metadata.publisher_priority = message.publisher_priority;
+ metadata.arrival_time = callbacks_.clock->Now();
+ visitor->OnObjectFragment(track->full_track_name(), metadata, *payload,
+ true);
}
}
@@ -592,7 +596,7 @@
switch (result) {
case MoqtFetchTask::GetNextObjectResult::kSuccess:
// Skip ObjectDoesNotExist in FETCH.
- if (object.status == MoqtObjectStatus::kObjectDoesNotExist) {
+ if (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist) {
QUIC_BUG(quic_bug_got_doesnotexist_in_fetch)
<< "Got ObjectDoesNotExist in FETCH";
continue;
@@ -734,7 +738,7 @@
}
webtransport::Stream* MoqtSession::OpenOrQueueDataStream(
- uint64_t subscription_id, Location first_object) {
+ uint64_t subscription_id, const NewStreamParameters& parameters) {
auto it = published_subscriptions_.find(subscription_id);
if (it == published_subscriptions_.end()) {
// It is possible that the subscription has been discarded while the stream
@@ -743,17 +747,18 @@
}
PublishedSubscription& subscription = *it->second;
if (!session_->CanOpenNextOutgoingUnidirectionalStream()) {
- subscription.AddQueuedOutgoingDataStream(first_object);
+ subscription.AddQueuedOutgoingDataStream(parameters);
// The subscription will notify the session about how to update the
// session's queue.
// TODO: limit the number of streams in the queue.
return nullptr;
}
- return OpenDataStream(subscription, first_object);
+ return OpenDataStream(subscription, parameters);
}
webtransport::Stream* MoqtSession::OpenDataStream(
- PublishedSubscription& subscription, Location first_object) {
+ PublishedSubscription& subscription,
+ const NewStreamParameters& parameters) {
webtransport::Stream* new_stream =
session_->OpenOutgoingUnidirectionalStream();
if (new_stream == nullptr) {
@@ -762,8 +767,8 @@
return nullptr;
}
new_stream->SetVisitor(std::make_unique<OutgoingDataStream>(
- this, new_stream, subscription, first_object));
- subscription.OnDataStreamCreated(new_stream->GetStreamId(), first_object);
+ this, new_stream, subscription, parameters));
+ subscription.OnDataStreamCreated(new_stream->GetStreamId(), parameters.index);
return new_stream;
}
@@ -833,10 +838,11 @@
}
// Pop the item from the subscription's queue, which might update
// subscribes_with_queued_outgoing_data_streams_.
- Location next_queued_stream =
+ NewStreamParameters next_queued_stream =
subscription->second->NextQueuedOutgoingDataStream();
// Check if Group is too old.
- if (next_queued_stream.group < subscription->second->first_active_group()) {
+ if (next_queued_stream.index.group <
+ subscription->second->first_active_group()) {
// The stream is too old to be sent.
continue;
}
@@ -1381,8 +1387,7 @@
start_object = Location(0, 0);
} else {
start_object = Location(
- fetch_end.group - message.joining_fetch->preceding_group_offset, 0,
- 0);
+ fetch_end.group - message.joining_fetch->preceding_group_offset, 0);
}
end_group = fetch_end.group;
end_object = fetch_end.object - 1;
@@ -1611,12 +1616,14 @@
subscribe->OnObject(/*is_datagram=*/false);
if (subscribe->visitor() != nullptr) {
// TODO(martinduke): Send extension headers.
- subscribe->visitor()->OnObjectFragment(
- track->full_track_name(),
- Location{message.group_id, message.subgroup_id.value_or(0),
- message.object_id},
- message.publisher_priority, message.object_status, payload,
- end_of_message);
+ PublishedObjectMetadata metadata;
+ metadata.location = Location(message.group_id, message.object_id);
+ metadata.subgroup = message.subgroup_id;
+ metadata.status = message.object_status;
+ metadata.publisher_priority = message.publisher_priority;
+ metadata.arrival_time = session_->callbacks_.clock->Now();
+ subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata,
+ payload, end_of_message);
}
} else { // FETCH
track->OnObjectOrOk();
@@ -1792,8 +1799,7 @@
if (!lazily_initialized_stream_map_.has_value()) {
QUICHE_DCHECK(
DoesTrackStatusImplyHavingData(*track_publisher_->GetTrackStatus()));
- lazily_initialized_stream_map_.emplace(
- track_publisher_->GetForwardingPreference());
+ lazily_initialized_stream_map_.emplace();
}
return *lazily_initialized_stream_map_;
}
@@ -1878,12 +1884,12 @@
}
void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
- Location sequence) {
+ Location sequence, uint64_t subgroup) {
if (!InWindow(sequence)) {
return;
}
- if (reset_subgroups_.contains(
- Location{sequence.group, sequence.subgroup, 0})) {
+ DataStreamIndex index(sequence.group, subgroup);
+ if (reset_subgroups_.contains(index)) {
// This subgroup has already been reset, ignore.
return;
}
@@ -1917,12 +1923,14 @@
}
std::optional<webtransport::StreamId> stream_id =
- stream_map().GetStreamForSequence(sequence);
+ stream_map().GetStreamFor(index);
webtransport::Stream* raw_stream = nullptr;
if (stream_id.has_value()) {
raw_stream = session_->session_->GetStreamById(*stream_id);
} else {
- raw_stream = session_->OpenOrQueueDataStream(request_id_, sequence);
+ raw_stream = session_->OpenOrQueueDataStream(
+ request_id_,
+ NewStreamParameters(sequence.group, subgroup, sequence.object));
}
if (raw_stream == nullptr) {
return;
@@ -1938,18 +1946,20 @@
"Publisher is gone");
}
-void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) {
- if (!InWindow(sequence)) {
+// TODO(martinduke): Revise to check if the last object has been delivered.
+void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location location,
+ uint64_t subgroup) {
+ if (!GroupInWindow(location.group)) {
return;
}
- if (reset_subgroups_.contains(
- Location{sequence.group, sequence.subgroup, 0})) {
+ DataStreamIndex index(location.group, subgroup);
+ if (reset_subgroups_.contains(index)) {
// This subgroup has already been reset, ignore.
return;
}
- QUICHE_DCHECK_GE(sequence.group, first_active_group_);
+ QUICHE_DCHECK_GE(location.group, first_active_group_);
std::optional<webtransport::StreamId> stream_id =
- stream_map().GetStreamForSequence(sequence);
+ stream_map().GetStreamFor(index);
if (!stream_id.has_value()) {
return;
}
@@ -1960,22 +1970,23 @@
}
OutgoingDataStream* stream =
static_cast<OutgoingDataStream*>(raw_stream->visitor());
- stream->Fin(sequence);
+ stream->Fin(location);
}
void MoqtSession::PublishedSubscription::OnSubgroupAbandoned(
- Location sequence, webtransport::StreamErrorCode error_code) {
- if (!InWindow(sequence)) {
+ uint64_t group, uint64_t subgroup,
+ webtransport::StreamErrorCode error_code) {
+ if (!GroupInWindow(group)) {
return;
}
- if (reset_subgroups_.contains(
- Location{sequence.group, sequence.subgroup, 0})) {
+ DataStreamIndex index(group, subgroup);
+ if (reset_subgroups_.contains(index)) {
// This subgroup has already been reset, ignore.
return;
}
- QUICHE_DCHECK_GE(sequence.group, first_active_group_);
+ QUICHE_DCHECK_GE(group, first_active_group_);
std::optional<webtransport::StreamId> stream_id =
- stream_map().GetStreamForSequence(sequence);
+ stream_map().GetStreamFor(index);
if (!stream_id.has_value()) {
return;
}
@@ -2004,8 +2015,8 @@
raw_stream->ResetWithUserCode(kResetCodeTimedOut);
}
first_active_group_ = std::max(first_active_group_, group_id + 1);
- absl::erase_if(reset_subgroups_, [&](const Location& sequence) {
- return sequence.group < first_active_group_;
+ absl::erase_if(reset_subgroups_, [&](const DataStreamIndex& index) {
+ return index.group < first_active_group_;
});
}
@@ -2018,7 +2029,7 @@
}
webtransport::SendOrder MoqtSession::PublishedSubscription::GetSendOrder(
- Location sequence) const {
+ Location sequence, uint64_t subgroup) const {
MoqtForwardingPreference forwarding_preference =
track_publisher_->GetForwardingPreference();
@@ -2031,21 +2042,23 @@
delivery_order);
}
return SendOrderForStream(subscriber_priority_, publisher_priority,
- sequence.group, sequence.subgroup, delivery_order);
+ sequence.group, subgroup, delivery_order);
}
// Returns the highest send order in the subscription.
void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream(
- Location first_object) {
+ const NewStreamParameters& parameters) {
std::optional<webtransport::SendOrder> start_send_order =
queued_outgoing_data_streams_.empty()
? std::optional<webtransport::SendOrder>()
: queued_outgoing_data_streams_.rbegin()->first;
- webtransport::SendOrder send_order = GetSendOrder(first_object);
+ webtransport::SendOrder send_order =
+ GetSendOrder(Location(parameters.index.group, parameters.first_object),
+ parameters.index.subgroup);
// Zero out the subscriber priority bits, since these will be added when
// updating the session.
queued_outgoing_data_streams_.emplace(
- UpdateSendOrderForSubscriberPriority(send_order, 0), first_object);
+ UpdateSendOrderForSubscriberPriority(send_order, 0), parameters);
if (!start_send_order.has_value()) {
session_->UpdateQueuedSendOrder(request_id_, std::nullopt, send_order);
} else if (*start_send_order < send_order) {
@@ -2054,14 +2067,18 @@
}
}
-Location MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
+MoqtSession::NewStreamParameters
+MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
QUICHE_DCHECK(!queued_outgoing_data_streams_.empty());
if (queued_outgoing_data_streams_.empty()) {
- return Location();
+ QUICHE_BUG(NextQueuedOutgoingDataStream_no_stream)
+ << "NextQueuedOutgoingDataStream called when there are no streams "
+ "pending.";
+ return NewStreamParameters(0, 0, 0);
}
auto it = queued_outgoing_data_streams_.rbegin();
webtransport::SendOrder old_send_order = FinalizeSendOrder(it->first);
- Location first_object = it->second;
+ NewStreamParameters first_stream = it->second;
// converting a reverse iterator to an iterator involves incrementing it and
// then taking base().
queued_outgoing_data_streams_.erase((++it).base());
@@ -2075,16 +2092,16 @@
new_send_order);
}
}
- return first_object;
+ return first_stream;
}
void MoqtSession::PublishedSubscription::OnDataStreamCreated(
- webtransport::StreamId id, Location start_sequence) {
+ webtransport::StreamId id, DataStreamIndex start_sequence) {
++streams_opened_;
stream_map().AddStream(start_sequence, id);
}
void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
- webtransport::StreamId id, Location end_sequence) {
+ webtransport::StreamId id, DataStreamIndex end_sequence) {
stream_map().RemoveStream(end_sequence, id);
}
@@ -2099,11 +2116,12 @@
MoqtSession::OutgoingDataStream::OutgoingDataStream(
MoqtSession* session, webtransport::Stream* stream,
- PublishedSubscription& subscription, Location first_object)
+ PublishedSubscription& subscription, const NewStreamParameters& parameters)
: session_(session),
stream_(stream),
subscription_id_(subscription.request_id()),
- next_object_(first_object),
+ index_(parameters.index),
+ next_object_(parameters.first_object),
session_liveness_(session->liveness_token_) {
UpdateSendOrder(subscription);
}
@@ -2124,7 +2142,7 @@
}
auto it = session_->published_subscriptions_.find(subscription_id_);
if (it != session_->published_subscriptions_.end()) {
- it->second->OnDataStreamDestroyed(stream_->GetStreamId(), next_object_);
+ it->second->OnDataStreamDestroyed(stream_->GetStreamId(), index_);
}
}
@@ -2140,7 +2158,7 @@
auto it = stream_->session_->published_subscriptions_.find(
stream_->subscription_id_);
if (it != stream_->session_->published_subscriptions_.end()) {
- it->second->OnStreamTimeout(stream_->next_object_);
+ it->second->OnStreamTimeout(stream_->index());
}
stream_->stream_->ResetWithUserCode(kResetCodeTimedOut);
}
@@ -2175,11 +2193,17 @@
PublishedSubscription& subscription) {
while (stream_->CanWrite()) {
std::optional<PublishedObject> object =
- subscription.publisher().GetCachedObject(next_object_);
+ subscription.publisher().GetCachedObject(index_.group, index_.subgroup,
+ next_object_);
if (!object.has_value()) {
break;
}
- if (!subscription.InWindow(next_object_)) {
+
+ QUICHE_DCHECK_EQ(object->metadata.location.group, index_.group);
+ QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup);
+ QUICHE_DCHECK(subscription.publisher().GetForwardingPreference() ==
+ MoqtForwardingPreference::kSubgroup);
+ if (!subscription.InWindow(object->metadata.location)) {
// It is possible that the next object became irrelevant due to a
// SUBSCRIBE_UPDATE. Close the stream if so.
bool success = stream_->SendFin();
@@ -2187,42 +2211,39 @@
<< "Writing FIN failed despite CanWrite() being true.";
return;
}
+
quic::QuicTimeDelta delivery_timeout = subscription.delivery_timeout();
if (!session_->alternate_delivery_timeout_ &&
- session_->callbacks_.clock->ApproximateNow() - object->arrival_time >
+ session_->callbacks_.clock->ApproximateNow() -
+ object->metadata.arrival_time >
delivery_timeout) {
- subscription.OnStreamTimeout(next_object_);
+ subscription.OnStreamTimeout(index_);
stream_->ResetWithUserCode(kResetCodeTimedOut);
return;
}
- QUICHE_DCHECK(next_object_ <= object->sequence);
- MoqtTrackPublisher& publisher = subscription.publisher();
- QUICHE_DCHECK(DoesTrackStatusImplyHavingData(*publisher.GetTrackStatus()));
- MoqtForwardingPreference forwarding_preference =
- publisher.GetForwardingPreference();
- UpdateSendOrder(subscription);
- if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
- QUICHE_BUG(quic_bug_SendObjects_for_Datagram)
- << "Datagram Track requesting SendObjects";
- return;
- }
- next_object_ = object->sequence.next();
- if (session_->WriteObjectToStream(
+
+ if (!session_->WriteObjectToStream(
stream_, subscription.track_alias(), *object,
MoqtDataStreamType::kStreamHeaderSubgroup, !stream_header_written_,
object->fin_after_this)) {
- stream_header_written_ = true;
- subscription.OnObjectSent(object->sequence);
+ // WriteObjectToStream() closes the connection on error, meaning that
+ // there is no need to process the stream any further.
+ return;
}
+ ++next_object_;
+ stream_header_written_ = true;
+ subscription.OnObjectSent(object->metadata.location);
+
if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
!session_->alternate_delivery_timeout_) {
- CreateAndSetAlarm(object->arrival_time + delivery_timeout);
+ CreateAndSetAlarm(object->metadata.arrival_time + delivery_timeout);
}
}
}
void MoqtSession::OutgoingDataStream::Fin(Location last_object) {
- if (next_object_ <= last_object) {
+ QUICHE_DCHECK_EQ(last_object.group, index_.group);
+ if (next_object_ <= last_object.object) {
// There is still data to send, do nothing.
return;
}
@@ -2248,11 +2269,11 @@
QUICHE_DCHECK(stream->CanWrite());
MoqtObject header;
header.track_alias = id;
- header.group_id = object.sequence.group;
- header.subgroup_id = object.sequence.subgroup;
- header.object_id = object.sequence.object;
- header.publisher_priority = object.publisher_priority;
- header.object_status = object.status;
+ header.group_id = object.metadata.location.group;
+ header.subgroup_id = object.metadata.subgroup;
+ header.object_id = object.metadata.location.object;
+ header.publisher_priority = object.metadata.publisher_priority;
+ header.object_status = object.metadata.status;
header.payload_length = object.payload.length();
quiche::QuicheBuffer serialized_header =
@@ -2274,7 +2295,7 @@
}
QUIC_DVLOG(1) << "Stream " << stream->GetStreamId() << " successfully wrote "
- << object.sequence << ", fin = " << fin;
+ << object.metadata.location << ", fin = " << fin;
return true;
}
@@ -2299,7 +2320,7 @@
void MoqtSession::PublishedSubscription::SendDatagram(Location sequence) {
std::optional<PublishedObject> object =
- track_publisher_->GetCachedObject(sequence);
+ track_publisher_->GetCachedObject(sequence.group, 0, sequence.object);
if (!object.has_value()) {
QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache)
<< "Got notification about an object that is not in the cache";
@@ -2308,23 +2329,24 @@
MoqtObject header;
header.track_alias = track_alias();
- header.group_id = object->sequence.group;
- header.object_id = object->sequence.object;
- header.publisher_priority = object->publisher_priority;
- header.object_status = object->status;
+ header.group_id = object->metadata.location.group;
+ header.object_id = object->metadata.location.object;
+ header.publisher_priority = object->metadata.publisher_priority;
+ header.object_status = object->metadata.status;
header.subgroup_id = std::nullopt;
header.payload_length = object->payload.length();
quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
header, object->payload.AsStringView());
session_->session_->SendOrQueueDatagram(datagram.AsStringView());
- OnObjectSent(object->sequence);
+ OnObjectSent(object->metadata.location);
}
void MoqtSession::OutgoingDataStream::UpdateSendOrder(
PublishedSubscription& subscription) {
- stream_->SetPriority(
- webtransport::StreamPriority{/*send_group_id=*/kMoqtSendGroupId,
- subscription.GetSendOrder(next_object_)});
+ stream_->SetPriority(webtransport::StreamPriority{
+ /*send_group_id=*/kMoqtSendGroupId,
+ subscription.GetSendOrder(Location(index_.group, next_object_),
+ index_.subgroup)});
}
void MoqtSession::OutgoingDataStream::CreateAndSetAlarm(
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 8448d14..67336e2 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -5,6 +5,7 @@
#ifndef QUICHE_QUIC_MOQT_MOQT_SESSION_H_
#define QUICHE_QUIC_MOQT_MOQT_SESSION_H_
+#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
@@ -213,6 +214,15 @@
struct Empty {};
+ struct NewStreamParameters {
+ DataStreamIndex index;
+ uint64_t first_object;
+
+ NewStreamParameters(uint64_t group, uint64_t subgroup,
+ uint64_t first_object)
+ : index(group, subgroup), first_object(first_object) {}
+ };
+
class QUICHE_EXPORT ControlStream : public webtransport::StreamVisitor,
public MoqtControlParserVisitor {
public:
@@ -364,10 +374,10 @@
MoqtSubscribeErrorReason reason,
std::optional<uint64_t> track_alias = std::nullopt) override;
// This is only called for objects that have just arrived.
- void OnNewObjectAvailable(Location sequence) override;
+ void OnNewObjectAvailable(Location location, uint64_t subgroup) override;
void OnTrackPublisherGone() override;
- void OnNewFinAvailable(Location sequence) override;
- void OnSubgroupAbandoned(Location sequence,
+ void OnNewFinAvailable(Location location, uint64_t subgroup) override;
+ void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup,
webtransport::StreamErrorCode error_code) override;
void OnGroupAbandoned(uint64_t group_id) override;
void ProcessObjectAck(const MoqtObjectAck& message) {
@@ -386,6 +396,9 @@
bool InWindow(Location sequence) {
return forward_ && window_.has_value() && window_->InWindow(sequence);
}
+ bool GroupInWindow(uint64_t group) {
+ return forward_ && window_.has_value() && window_->GroupInWindow(group);
+ }
Location GetWindowStart() const {
QUICHE_CHECK(window_.has_value());
return window_->start();
@@ -393,38 +406,38 @@
MoqtFilterType filter_type() const { return filter_type_; };
void OnDataStreamCreated(webtransport::StreamId id,
- Location start_sequence);
+ DataStreamIndex start_sequence);
void OnDataStreamDestroyed(webtransport::StreamId id,
- Location end_sequence);
+ DataStreamIndex end_sequence);
void OnObjectSent(Location sequence);
std::vector<webtransport::StreamId> GetAllStreams() const;
- webtransport::SendOrder GetSendOrder(Location sequence) const;
+ webtransport::SendOrder GetSendOrder(Location sequence,
+ uint64_t subgroup) const;
- void AddQueuedOutgoingDataStream(Location first_object);
+ void AddQueuedOutgoingDataStream(const NewStreamParameters& parameters);
// Pops the pending outgoing data stream, with the highest send order.
// The session keeps track of which subscribes have pending streams. This
// function will trigger a QUICHE_DCHECK if called when there are no pending
// streams.
- Location NextQueuedOutgoingDataStream();
+ NewStreamParameters NextQueuedOutgoingDataStream();
quic::QuicTimeDelta delivery_timeout() const { return delivery_timeout_; }
void set_delivery_timeout(quic::QuicTimeDelta timeout) {
delivery_timeout_ = timeout;
}
- void OnStreamTimeout(Location sequence) {
- sequence.object = 0;
- reset_subgroups_.insert(sequence);
+ void OnStreamTimeout(DataStreamIndex index) {
+ reset_subgroups_.insert(index);
if (session_->alternate_delivery_timeout_) {
- first_active_group_ = std::max(first_active_group_, sequence.group + 1);
+ first_active_group_ = std::max(first_active_group_, index.group + 1);
}
}
uint64_t first_active_group() const { return first_active_group_; }
- absl::flat_hash_set<Location>& reset_subgroups() {
+ absl::flat_hash_set<DataStreamIndex>& reset_subgroups() {
return reset_subgroups_;
}
@@ -461,7 +474,7 @@
uint64_t first_active_group_ = 0;
// If a stream has been reset due to delivery timeout, do not open a new
// stream if more object arrive for it.
- absl::flat_hash_set<Location> reset_subgroups_;
+ absl::flat_hash_set<DataStreamIndex> reset_subgroups_;
// The min of DELIVERY_TIMEOUT from SUBSCRIBE and SUBSCRIBE_OK.
quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
@@ -474,14 +487,14 @@
// Store the send order of queued outgoing data streams. Use a
// subscriber_priority_ of zero to avoid having to update it, and call
// FinalizeSendOrder() whenever delivering it to the MoqtSession.
- absl::btree_multimap<webtransport::SendOrder, Location>
+ absl::btree_multimap<webtransport::SendOrder, NewStreamParameters>
queued_outgoing_data_streams_;
};
class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
public:
OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
PublishedSubscription& subscription,
- Location first_object);
+ const NewStreamParameters& parameters);
~OutgoingDataStream();
// webtransport::StreamVisitor implementation.
@@ -519,6 +532,8 @@
// alarm is already created.
void CreateAndSetAlarm(quic::QuicTime deadline);
+ DataStreamIndex index() const { return index_; }
+
private:
friend class test::MoqtSessionPeer;
friend class DeliveryTimeoutDelegate;
@@ -530,10 +545,11 @@
MoqtSession* session_;
webtransport::Stream* stream_;
uint64_t subscription_id_;
- // A Location with the minimum object ID that should go out next. The
- // session doesn't know what the next object ID in the stream is because
- // the next object could be in a different subgroup or simply be skipped.
- Location next_object_;
+ DataStreamIndex index_;
+ // Minimum object ID that should go out next. The session doesn't know the
+ // exact ID of the next object in the stream because the next object could
+ // be in a different subgroup or simply be skipped.
+ uint64_t next_object_;
bool stream_header_written_ = false;
// If this data stream is for SUBSCRIBE, reset it if an object has been
// excessively delayed per Section 7.1.1.2.
@@ -632,12 +648,12 @@
// Opens a new data stream, or queues it if the session is flow control
// blocked.
- webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id,
- Location first_object);
+ webtransport::Stream* OpenOrQueueDataStream(
+ uint64_t subscription_id, const NewStreamParameters& parameters);
// Same as above, except the session is required to be not flow control
// blocked.
webtransport::Stream* OpenDataStream(PublishedSubscription& subscription,
- Location first_object);
+ const NewStreamParameters& parameters);
// Returns false if creation failed.
[[nodiscard]] bool OpenDataStream(std::shared_ptr<PublishedFetch> fetch,
webtransport::SendOrder send_order);
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index ed23ccd..9b25531 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -519,7 +519,7 @@
// forward=false, so incoming objects are ignored.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.Times(0);
- listener->OnNewObjectAvailable(Location(0, 0));
+ listener->OnNewObjectAvailable(Location(0, 0), 0);
}
TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) {
@@ -535,7 +535,7 @@
// Window was not set to (0, 0) by SUBSCRIBE acceptance.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.Times(0);
- listener->OnNewObjectAvailable(Location(0, 0));
+ listener->OnNewObjectAvailable(Location(0, 0), 0);
}
TEST_F(MoqtSessionTest, SubscribeNextGroup) {
@@ -550,11 +550,11 @@
// Later objects in group 10 ignored.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.Times(0);
- listener->OnNewObjectAvailable(Location(10, 21));
+ listener->OnNewObjectAvailable(Location(10, 21), 0);
// Group 11 is sent.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
- listener->OnNewObjectAvailable(Location(11, 0));
+ listener->OnNewObjectAvailable(Location(11, 0), 0);
}
TEST_F(MoqtSessionTest, TwoSubscribesForTrack) {
@@ -1047,7 +1047,7 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
object_stream->OnObjectMessage(object, payload, true);
@@ -1072,7 +1072,7 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
object_stream->OnObjectMessage(object, payload, false);
@@ -1103,7 +1103,7 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(2);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(2);
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
object_stream->OnObjectMessage(object, payload, false);
@@ -1129,13 +1129,13 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
- MoqtPriority publisher_priority, MoqtObjectStatus status,
+ EXPECT_CALL(visitor_, OnObjectFragment)
+ .WillOnce([&](const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
absl::string_view payload, bool end_of_message) {
EXPECT_EQ(full_track_name, ftn);
- EXPECT_EQ(sequence.group, object.group_id);
- EXPECT_EQ(sequence.object, object.object_id);
+ EXPECT_EQ(metadata.location.group, object.group_id);
+ EXPECT_EQ(metadata.location.object, object.object_id);
});
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
@@ -1174,13 +1174,13 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
- MoqtPriority publisher_priority, MoqtObjectStatus status,
+ EXPECT_CALL(visitor, OnObjectFragment)
+ .WillOnce([&](const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
absl::string_view payload, bool end_of_message) {
EXPECT_EQ(full_track_name, ftn);
- EXPECT_EQ(sequence.group, object.group_id);
- EXPECT_EQ(sequence.object, object.object_id);
+ EXPECT_EQ(metadata.location.group, object.group_id);
+ EXPECT_EQ(metadata.location.object, object.object_id);
});
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
@@ -1281,18 +1281,16 @@
fin |= options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- false};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), false};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_TRUE(correct_message);
EXPECT_FALSE(fin);
EXPECT_EQ(MoqtSessionPeer::LargestSentForSubscription(&session_, 0),
@@ -1335,18 +1333,16 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- true};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), true};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_TRUE(correct_message);
EXPECT_TRUE(fin);
}
@@ -1387,18 +1383,16 @@
fin |= options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- true};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), true};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_TRUE(correct_message);
EXPECT_TRUE(fin);
@@ -1435,36 +1429,34 @@
// Verify first six message fields are sent correctly
bool correct_message = false;
const std::string kExpectedMessage = {0x04, 0x02, 0x05, 0x00, 0x7f};
- EXPECT_CALL(mock_stream_, Writev(_, _))
+ EXPECT_CALL(mock_stream_, Writev)
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
correct_message = absl::StartsWith(data[0], kExpectedMessage);
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- false};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), false};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_TRUE(correct_message);
EXPECT_FALSE(fin);
fin = false;
- EXPECT_CALL(mock_stream_, Writev(_, _))
+ EXPECT_CALL(mock_stream_, Writev)
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
EXPECT_TRUE(data.empty());
fin = options.send_fin();
return absl::OkStatus();
});
- subscription->OnNewFinAvailable(Location(5, 0));
+ subscription->OnNewFinAvailable(Location(5, 0), 0);
}
TEST_F(MoqtSessionTest, SeparateFinForFutureObject) {
@@ -1503,39 +1495,38 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- false};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), false};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_FALSE(fin);
// Try to deliver (5,1), but fail.
EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
- EXPECT_CALL(*track, GetCachedObject(_)).Times(0);
- EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
- subscription->OnNewObjectAvailable(Location(5, 1));
+ EXPECT_CALL(*track, GetCachedObject).Times(0);
+ EXPECT_CALL(mock_stream_, Writev).Times(0);
+ subscription->OnNewObjectAvailable(Location(5, 1), 0);
// Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
- EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
- subscription->OnNewFinAvailable(Location(5, 1));
+ EXPECT_CALL(mock_stream_, Writev).Times(0);
+ subscription->OnNewFinAvailable(Location(5, 1), 0);
// Reopen the window.
correct_message = false;
// object id, extensions, payload length, status.
const std::string kExpectedMessage2 = {0x01, 0x00, 0x00, 0x03};
EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([&] {
- return PublishedObject{
- Location(5, 1), MoqtObjectStatus::kEndOfGroup, 127,
- MemSliceFromString(""), MoqtSessionPeer::Now(&session_), true};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([&] {
+ return PublishedObject{PublishedObjectMetadata{
+ Location(5, 1), 0, MoqtObjectStatus::kEndOfGroup,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString(""), true};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 2))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 2)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
EXPECT_CALL(mock_stream_, Writev(_, _))
@@ -1586,22 +1577,20 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
- return PublishedObject{Location(5, 0),
- MoqtObjectStatus::kNormal,
- 127,
- MemSliceFromString("deadbeef"),
- MoqtSessionPeer::Now(&session_),
- false};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 127, MoqtSessionPeer::Now(&session_)},
+ MemSliceFromString("deadbeef"), false};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
// Abandon the subgroup.
EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
- subscription->OnSubgroupAbandoned(Location(5, 0), 0x1);
+ subscription->OnSubgroupAbandoned(5, 0, 0x1);
}
// TODO: Test operation with multiple streams.
@@ -1616,7 +1605,7 @@
// Queue the outgoing stream.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
// Unblock the session, and cause the queued stream to be sent.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -1638,11 +1627,13 @@
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] {
- return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128,
- MemSliceFromString("deadbeef")};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 128},
+ MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1658,8 +1649,8 @@
// Queue the outgoing stream.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(false));
- subscription->OnNewObjectAvailable(Location(5, 0, 0));
- subscription->OnNewObjectAvailable(Location(6, 0, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
+ subscription->OnNewObjectAvailable(Location(6, 0), 0);
subscription->OnGroupAbandoned(5);
// Unblock the session, and cause the queued stream to be sent. There should
@@ -1684,11 +1675,13 @@
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(Location(6, 0))).WillRepeatedly([] {
- return PublishedObject{Location(6, 0), MoqtObjectStatus::kNormal, 128,
- MemSliceFromString("deadbeef")};
+ EXPECT_CALL(*track, GetCachedObject(6, 0, 0)).WillRepeatedly([] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(6, 0), 0, MoqtObjectStatus::kNormal,
+ 128},
+ MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(Location(6, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(6, 0, 1)).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1721,21 +1714,23 @@
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] {
- return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128,
- MemSliceFromString("deadbeef")};
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location(5, 0), 0, MoqtObjectStatus::kNormal,
+ 128},
+ MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillOnce([] {
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillOnce([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(Location(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0), 0);
// Now that the stream exists and is recorded within subscription, make it
// disappear by returning nullptr.
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(nullptr));
- EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).Times(0);
- subscription->OnNewObjectAvailable(Location(5, 1));
+ EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).Times(0);
+ subscription->OnNewObjectAvailable(Location(5, 1), 0);
}
TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -1840,12 +1835,13 @@
return webtransport::DatagramStatus(
webtransport::DatagramStatusCode::kSuccess, "");
});
- EXPECT_CALL(*track_publisher, GetCachedObject(Location{5, 0}))
- .WillRepeatedly([] {
- return PublishedObject{Location{5, 0}, MoqtObjectStatus::kNormal, 128,
- MemSliceFromString("deadbeef")};
- });
- listener->OnNewObjectAvailable(Location(5, 0));
+ EXPECT_CALL(*track_publisher, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
+ return PublishedObject{
+ PublishedObjectMetadata{Location{5, 0}, 0, MoqtObjectStatus::kNormal,
+ 128},
+ MemSliceFromString("deadbeef")};
+ });
+ listener->OnNewObjectAvailable(Location(5, 0), 0);
EXPECT_TRUE(correct_message);
}
@@ -1866,11 +1862,18 @@
};
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
- EXPECT_CALL(visitor_,
- OnObjectFragment(ftn, Location{object.group_id, object.object_id},
- object.publisher_priority, object.object_status,
- payload, true))
- .Times(1);
+ EXPECT_CALL(visitor_, OnObjectFragment)
+ .WillOnce([&](const FullTrackName& track_name,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view received_payload, bool fin) {
+ EXPECT_EQ(track_name, ftn);
+ EXPECT_EQ(metadata.location,
+ Location(object.group_id, object.object_id));
+ EXPECT_EQ(metadata.publisher_priority, object.publisher_priority);
+ EXPECT_EQ(metadata.status, object.object_status);
+ EXPECT_EQ(payload, received_payload);
+ EXPECT_TRUE(fin);
+ });
session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
}
@@ -1892,7 +1895,7 @@
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(1);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(1);
EXPECT_CALL(mock_stream_, GetStreamId())
.WillRepeatedly(Return(kIncomingUniStreamId));
object_stream->OnObjectMessage(object, payload, true);
@@ -1924,7 +1927,7 @@
std::unique_ptr<MoqtDataParserVisitor> object_stream =
MoqtSessionPeer::CreateIncomingDataStream(
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(0);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
object_stream->OnObjectMessage(object, payload, true);
}
@@ -1936,7 +1939,7 @@
MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x80, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
- EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _)).Times(0);
+ EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
}
@@ -1954,9 +1957,9 @@
.WillOnce(Return(false));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription->OnNewObjectAvailable(Location(1, 0));
- subscription->OnNewObjectAvailable(Location(0, 0));
- subscription->OnNewObjectAvailable(Location(2, 0));
+ subscription->OnNewObjectAvailable(Location(1, 0), 0);
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
+ subscription->OnNewObjectAvailable(Location(2, 0), 0);
// These should be opened in the sequence (0, 0), (1, 0), (2, 0).
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(true));
@@ -1990,24 +1993,24 @@
EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
return stream_visitor[2].get();
});
- EXPECT_CALL(*track, GetCachedObject(Location(0, 0)))
- .WillOnce(
- Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
- MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(Location(0, 1)))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(Location(1, 0)))
- .WillOnce(
- Return(PublishedObject{Location(1, 0), MoqtObjectStatus::kNormal, 127,
- MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(Location(1, 1)))
- .WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(Location(2, 0)))
- .WillOnce(
- Return(PublishedObject{Location(2, 0), MoqtObjectStatus::kNormal, 127,
- MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(Location(2, 1)))
- .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(*track, GetCachedObject(0, 0, 0))
+ .WillOnce(Return(PublishedObject{
+ PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal,
+ 127},
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
+ EXPECT_CALL(*track, GetCachedObject(1, 0, 0))
+ .WillOnce(Return(PublishedObject{
+ PublishedObjectMetadata{Location(1, 0), 0, MoqtObjectStatus::kNormal,
+ 127},
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(1, 0, 1)).WillOnce(Return(std::nullopt));
+ EXPECT_CALL(*track, GetCachedObject(2, 0, 0))
+ .WillOnce(Return(PublishedObject{
+ PublishedObjectMetadata{Location(2, 0), 0, MoqtObjectStatus::kNormal,
+ 127},
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(2, 0, 1)).WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
@@ -2047,7 +2050,7 @@
.WillOnce(Return(false));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription->OnNewObjectAvailable(Location(0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
// Delete the subscription, then grant stream credit.
MoqtSessionPeer::DeleteSubscription(&session_, 0);
@@ -2085,10 +2088,10 @@
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
EXPECT_CALL(*track2, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription0->OnNewObjectAvailable(Location(0, 0));
- subscription1->OnNewObjectAvailable(Location(0, 0));
- subscription0->OnNewObjectAvailable(Location(1, 0));
- subscription1->OnNewObjectAvailable(Location(1, 0));
+ subscription0->OnNewObjectAvailable(Location(0, 0), 0);
+ subscription1->OnNewObjectAvailable(Location(0, 0), 0);
+ subscription0->OnNewObjectAvailable(Location(1, 0), 0);
+ subscription1->OnNewObjectAvailable(Location(1, 0), 0);
// Allow one stream to be opened. It will be group 0, subscription 0.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -2106,14 +2109,14 @@
EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
return stream_visitor0.get();
});
- EXPECT_CALL(*track1, GetCachedObject(Location(0, 0)))
- .WillOnce(
- Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
- MemSliceFromString("foobar")}));
- EXPECT_CALL(*track1, GetCachedObject(Location(0, 1)))
- .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(*track1, GetCachedObject(0, 0, 0))
+ .WillOnce(Return(PublishedObject{
+ PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal,
+ 127},
+ MemSliceFromString("foobar")}));
+ EXPECT_CALL(*track1, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
- EXPECT_CALL(mock_stream0, Writev(_, _))
+ EXPECT_CALL(mock_stream0, Writev)
.WillOnce([&](absl::Span<const absl::string_view> data,
const quiche::StreamWriteOptions& options) {
// Check track alias is 14.
@@ -2142,12 +2145,12 @@
EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
return stream_visitor1.get();
});
- EXPECT_CALL(*track2, GetCachedObject(Location(0, 0)))
- .WillOnce(
- Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
- MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track2, GetCachedObject(Location(0, 1)))
- .WillOnce(Return(std::nullopt));
+ EXPECT_CALL(*track2, GetCachedObject(0, 0, 0))
+ .WillOnce(Return(PublishedObject{
+ PublishedObjectMetadata{Location(0, 0), 0, MoqtObjectStatus::kNormal,
+ 127},
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track2, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream1, Writev(_, _))
.WillOnce([&](absl::Span<const absl::string_view> data,
@@ -2195,9 +2198,10 @@
EXPECT_CALL(data_stream, CanWrite).WillRepeatedly(Return(true));
EXPECT_CALL(*fetch_task, GetNextObject)
.WillOnce(Invoke([=](PublishedObject& output) {
- output.sequence = location;
- output.status = status;
- output.publisher_priority = 128;
+ output.metadata.location = location;
+ output.metadata.subgroup = 0;
+ output.metadata.status = status;
+ output.metadata.publisher_priority = 128;
output.payload = quiche::QuicheMemSlice::Copy(payload);
output.fin_after_this = true; // should be ignored.
return MoqtFetchTask::GetNextObjectResult::kSuccess;
@@ -2424,16 +2428,16 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, Location(4, 0, 10));
+ SetLargestId(track, Location(4, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtObjectListener* subscription =
MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
ASSERT_NE(subscription, nullptr);
EXPECT_TRUE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11)));
+ MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 11)));
EXPECT_FALSE(
- MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 10)));
+ MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 10)));
// Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
MoqtFetch fetch = DefaultFetch();
@@ -2464,7 +2468,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, Location(2, 0, 10));
+ SetLargestId(track, Location(2, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtFetch fetch = DefaultFetch();
@@ -2672,7 +2676,7 @@
do {
result = fetch_task->GetNextObject(object);
if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
- EXPECT_EQ(object.sequence.object, expected_object_id);
+ EXPECT_EQ(object.metadata.location.object, expected_object_id);
++expected_object_id;
}
} while (result != MoqtFetchTask::GetNextObjectResult::kPending);
@@ -2801,7 +2805,7 @@
PublishedObject new_object;
result = fetch_task->GetNextObject(new_object);
if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
- EXPECT_EQ(new_object.sequence.object, expected_object_id);
+ EXPECT_EQ(new_object.metadata.location.object, expected_object_id);
++expected_object_id;
}
} while (result != MoqtFetchTask::GetNextObjectResult::kPending);
@@ -2822,7 +2826,7 @@
PublishedObject new_object;
result = fetch_task->GetNextObject(new_object);
if (result == MoqtFetchTask::GetNextObjectResult::kSuccess) {
- EXPECT_EQ(new_object.sequence.object, expected_object_id);
+ EXPECT_EQ(new_object.metadata.location.object, expected_object_id);
++expected_object_id;
}
} while (result != MoqtFetchTask::GetNextObjectResult::kPending);
@@ -2915,30 +2919,35 @@
EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() {
return stream_visitor.get();
}));
- EXPECT_CALL(*track_publisher, GetCachedObject(_))
- .WillOnce(Return(PublishedObject{
- Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
- quiche::QuicheMemSlice(),
- MoqtSessionPeer::Now(&session_) - quic::QuicTimeDelta::FromSeconds(1),
- false}));
+ EXPECT_CALL(*track_publisher, GetCachedObject)
+ .WillOnce(
+ Return(PublishedObject{PublishedObjectMetadata{
+ Location(0, 0),
+ 0,
+ MoqtObjectStatus::kObjectDoesNotExist,
+ 0,
+ MoqtSessionPeer::Now(&session_) -
+ quic::QuicTimeDelta::FromSeconds(1),
+ },
+ quiche::QuicheMemSlice(), false}));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut))
.WillOnce(Invoke([&](webtransport::StreamErrorCode /*error*/) {
stream_visitor.reset();
}));
// Arrival time is very old; reset immediately.
- subscription->OnNewObjectAvailable(Location(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
// Subsequent objects for that subgroup are ignored.
- EXPECT_CALL(*track_publisher, GetCachedObject(_)).Times(0);
+ EXPECT_CALL(*track_publisher, GetCachedObject).Times(0);
EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0);
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.Times(0);
- subscription->OnNewObjectAvailable(Location(0, 0, 1));
+ subscription->OnNewObjectAvailable(Location(0, 1), 0);
// Check that reset_subgroups_ is pruned.
- EXPECT_TRUE(
- MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1)));
+ EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
+ DataStreamIndex(0, 0)));
subscription->OnGroupAbandoned(0);
- EXPECT_FALSE(
- MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1)));
+ EXPECT_FALSE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
+ DataStreamIndex(0, 0)));
}
TEST_F(MoqtSessionTest, DeliveryTimeoutAfterIntegratedFin) {
@@ -2971,14 +2980,16 @@
EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() {
return stream_visitor.get();
}));
- EXPECT_CALL(*track_publisher, GetCachedObject(_))
+ EXPECT_CALL(*track_publisher, GetCachedObject)
.WillOnce(Return(PublishedObject{
- Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
- quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), true}))
+ PublishedObjectMetadata{Location(0, 0), 0,
+ MoqtObjectStatus::kObjectDoesNotExist, 0,
+ MoqtSessionPeer::Now(&session_)},
+ quiche::QuicheMemSlice(), true}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)).Times(0);
- subscription->OnNewObjectAvailable(Location(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
MoqtSessionPeer::GetAlarm(stream_visitor.get()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut))
@@ -3020,16 +3031,18 @@
EXPECT_CALL(data_mock, visitor()).WillRepeatedly(Invoke([&]() {
return stream_visitor.get();
}));
- EXPECT_CALL(*track_publisher, GetCachedObject(_))
+ EXPECT_CALL(*track_publisher, GetCachedObject)
.WillOnce(Return(PublishedObject{
- Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
- quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
+ PublishedObjectMetadata{Location(0, 0), 0,
+ MoqtObjectStatus::kObjectDoesNotExist, 0,
+ MoqtSessionPeer::Now(&session_)},
+ quiche::QuicheMemSlice(), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(Location(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewFinAvailable(Location(0, 0, 0));
+ subscription->OnNewFinAvailable(Location(0, 0), 0);
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
MoqtSessionPeer::GetAlarm(stream_visitor.get()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut))
@@ -3072,13 +3085,15 @@
EXPECT_CALL(data_mock1, visitor()).WillRepeatedly(Invoke([&]() {
return stream_visitor1.get();
}));
- EXPECT_CALL(*track_publisher, GetCachedObject(_))
+ EXPECT_CALL(*track_publisher, GetCachedObject)
.WillOnce(Return(PublishedObject{
- Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
- quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
+ PublishedObjectMetadata{Location(0, 0), 0,
+ MoqtObjectStatus::kObjectDoesNotExist, 0,
+ MoqtSessionPeer::Now(&session_)},
+ quiche::QuicheMemSlice(), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(Location(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0), 0);
webtransport::test::MockStream data_mock2;
EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
@@ -3097,13 +3112,15 @@
EXPECT_CALL(data_mock2, visitor()).WillRepeatedly(Invoke([&]() {
return stream_visitor2.get();
}));
- EXPECT_CALL(*track_publisher, GetCachedObject(_))
+ EXPECT_CALL(*track_publisher, GetCachedObject)
.WillOnce(Return(PublishedObject{
- Location(1, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
- quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
+ PublishedObjectMetadata{Location(1, 0), 0,
+ MoqtObjectStatus::kObjectDoesNotExist, 0,
+ MoqtSessionPeer::Now(&session_)},
+ quiche::QuicheMemSlice(), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(Location(1, 0, 0));
+ subscription->OnNewObjectAvailable(Location(1, 0), 0);
// Group 1 should start the timer on the Group 0 stream.
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 2fb7225..1e0ad50 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -5,9 +5,11 @@
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include <cstdint>
+#include <limits>
#include <optional>
#include <vector>
+#include "quiche/quic/core/quic_interval.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/common/platform/api/quiche_logging.h"
@@ -15,59 +17,24 @@
namespace moqt {
-ReducedSequenceIndex::ReducedSequenceIndex(
- Location sequence, MoqtForwardingPreference preference) {
- switch (preference) {
- case MoqtForwardingPreference::kSubgroup:
- sequence_ = Location(sequence.group, sequence.subgroup, 0);
- break;
- case MoqtForwardingPreference::kDatagram:
- sequence_ = Location(sequence.group, 0, sequence.object);
- return;
- }
-}
-
-std::optional<webtransport::StreamId> SendStreamMap::GetStreamForSequence(
- Location sequence) const {
- QUICHE_DCHECK(forwarding_preference_ == MoqtForwardingPreference::kSubgroup);
- Location index =
- ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
- auto group_it = send_streams_.find(index.group);
- if (group_it == send_streams_.end()) {
+std::optional<webtransport::StreamId> SendStreamMap::GetStreamFor(
+ DataStreamIndex index) const {
+ auto it = send_streams_.find(index);
+ if (it == send_streams_.end()) {
return std::nullopt;
}
- auto subgroup_it = group_it->second.find(index.subgroup);
- if (subgroup_it == group_it->second.end()) {
- return std::nullopt;
- }
- return subgroup_it->second;
+ return it->second;
}
-void SendStreamMap::AddStream(Location sequence,
+void SendStreamMap::AddStream(DataStreamIndex index,
webtransport::StreamId stream_id) {
- Location index =
- ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
- auto [it, result] = send_streams_.insert({index.group, Group()});
- auto [sg, success] = it->second.try_emplace(index.subgroup, stream_id);
+ auto [it, success] = send_streams_.emplace(index, stream_id);
QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added";
}
-void SendStreamMap::RemoveStream(Location sequence,
+void SendStreamMap::RemoveStream(DataStreamIndex index,
webtransport::StreamId stream_id) {
- Location index =
- ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
- auto group_it = send_streams_.find(index.group);
- if (group_it == send_streams_.end()) {
- QUICHE_NOTREACHED();
- return;
- }
- auto subgroup_it = group_it->second.find(index.subgroup);
- if (subgroup_it == group_it->second.end() ||
- subgroup_it->second != stream_id) {
- QUICHE_NOTREACHED();
- return;
- }
- group_it->second.erase(subgroup_it);
+ send_streams_.erase(index);
}
bool SubscribeWindow::TruncateStart(Location start) {
@@ -96,25 +63,30 @@
std::vector<webtransport::StreamId> SendStreamMap::GetAllStreams() const {
std::vector<webtransport::StreamId> ids;
- for (const auto& [group, subgroup_map] : send_streams_) {
- for (const auto& [subgroup, stream_id] : subgroup_map) {
- ids.push_back(stream_id);
- }
+ for (const auto& [index, stream_id] : send_streams_) {
+ ids.push_back(stream_id);
}
return ids;
}
std::vector<webtransport::StreamId> SendStreamMap::GetStreamsForGroup(
uint64_t group_id) const {
+ const auto start_it = send_streams_.lower_bound(DataStreamIndex(group_id, 0));
+ const auto end_it = send_streams_.upper_bound(
+ DataStreamIndex(group_id, std::numeric_limits<uint64_t>::max()));
std::vector<webtransport::StreamId> ids;
- auto it = send_streams_.find(group_id);
- if (it == send_streams_.end()) {
- return ids;
- }
- for (const auto& [subgroup, stream_id] : it->second) {
- ids.push_back(stream_id);
+ for (auto it = start_it; it != end_it; ++it) {
+ ids.push_back(it->second);
}
return ids;
}
+bool SubscribeWindow::GroupInWindow(uint64_t group) const {
+ const quic::QuicInterval<Location> group_window(
+ Location(group, 0),
+ Location(group, std::numeric_limits<uint64_t>::max()));
+ const quic::QuicInterval<Location> subscription_window(start_, end_);
+ return group_window.Intersects(subscription_window);
+}
+
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index f1eea5d..17d5351 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -7,10 +7,13 @@
#include <cstdint>
#include <optional>
+#include <tuple>
#include <vector>
#include "absl/container/btree_map.h"
+#include "quiche/quic/core/quic_interval.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/web_transport/web_transport.h"
@@ -37,6 +40,7 @@
bool InWindow(const Location& seq) const {
return start_ <= seq && seq <= end_;
}
+ bool GroupInWindow(uint64_t group) const;
Location start() const { return start_; }
Location end() const { return end_; }
@@ -55,47 +59,57 @@
Location end_ = Location(UINT64_MAX, UINT64_MAX);
};
-// ReducedSequenceIndex represents an index object such that if two sequence
-// numbers are mapped to the same stream, they will be mapped to the same index.
-class ReducedSequenceIndex {
- public:
- ReducedSequenceIndex(Location sequence, MoqtForwardingPreference preference);
+// A tuple uniquely identifying a WebTransport data stream associated with a
+// subscription. By convention, if a DataStreamIndex is necessary for a datagram
+// track, `subgroup` is set to zero.
+struct DataStreamIndex {
+ uint64_t group = 0;
+ uint64_t subgroup = 0;
- bool operator==(const ReducedSequenceIndex& other) const {
- return sequence_ == other.sequence_;
+ DataStreamIndex() = default;
+ DataStreamIndex(uint64_t group, uint64_t subgroup)
+ : group(group), subgroup(subgroup) {}
+ explicit DataStreamIndex(const PublishedObject& object)
+ : group(object.metadata.location.group),
+ subgroup(object.metadata.subgroup.value_or(0)) {}
+
+ bool operator==(const DataStreamIndex& other) const {
+ return group == other.group && subgroup == other.subgroup;
}
- bool operator!=(const ReducedSequenceIndex& other) const {
- return sequence_ != other.sequence_;
+
+ bool operator<(const DataStreamIndex& other) const {
+ return std::make_tuple(group, subgroup) <
+ std::make_tuple(other.group, other.subgroup);
}
- Location sequence() { return sequence_; }
+ bool operator<=(const DataStreamIndex& other) const {
+ return std::make_tuple(group, subgroup) <=
+ std::make_tuple(other.group, other.subgroup);
+ }
+ bool operator>(const DataStreamIndex& other) const {
+ return !(*this <= other);
+ }
template <typename H>
- friend H AbslHashValue(H h, const ReducedSequenceIndex& m) {
- return H::combine(std::move(h), m.sequence_);
+ friend H AbslHashValue(H h, const DataStreamIndex& index) {
+ return H::combine(std::move(h), index.group, index.subgroup);
}
-
- private:
- Location sequence_;
};
// A map of outgoing data streams indexed by object sequence numbers.
class QUICHE_EXPORT SendStreamMap {
public:
- explicit SendStreamMap(MoqtForwardingPreference forwarding_preference)
- : forwarding_preference_(forwarding_preference) {}
+ SendStreamMap() = default;
- std::optional<webtransport::StreamId> GetStreamForSequence(
- Location sequence) const;
- void AddStream(Location sequence, webtransport::StreamId stream_id);
- void RemoveStream(Location sequence, webtransport::StreamId stream_id);
+ std::optional<webtransport::StreamId> GetStreamFor(
+ DataStreamIndex index) const;
+ void AddStream(DataStreamIndex index, webtransport::StreamId stream_id);
+ void RemoveStream(DataStreamIndex index, webtransport::StreamId stream_id);
std::vector<webtransport::StreamId> GetAllStreams() const;
std::vector<webtransport::StreamId> GetStreamsForGroup(
uint64_t group_id) const;
private:
- using Group = absl::btree_map<uint64_t, webtransport::StreamId>;
- absl::btree_map<uint64_t, Group> send_streams_;
- MoqtForwardingPreference forwarding_preference_;
+ absl::btree_map<DataStreamIndex, webtransport::StreamId> send_streams_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 2f2ed7a..cfbeb8f 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -34,16 +34,16 @@
}
TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) {
- SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup);
- stream_map.AddStream(Location{4, 0}, 2);
- EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), std::nullopt);
- stream_map.AddStream(Location{5, 2}, 6);
- EXPECT_QUIC_BUG(stream_map.AddStream(Location{5, 3}, 6),
+ SendStreamMap stream_map;
+ stream_map.AddStream(DataStreamIndex{4, 0}, 2);
+ EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 0)), std::nullopt);
+ stream_map.AddStream(DataStreamIndex{5, 0}, 6);
+ stream_map.AddStream(DataStreamIndex{5, 1}, 7);
+ EXPECT_QUIC_BUG(stream_map.AddStream(DataStreamIndex{5, 0}, 6),
"Stream already added");
- EXPECT_EQ(stream_map.GetStreamForSequence(Location(4, 1)), 2);
- EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), 6);
- stream_map.RemoveStream(Location{5, 1}, 6);
- EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 2)), std::nullopt);
+ EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(4, 0)), 2);
+ stream_map.RemoveStream(DataStreamIndex{5, 1}, 7);
+ EXPECT_EQ(stream_map.GetStreamFor(DataStreamIndex(5, 1)), std::nullopt);
}
TEST_F(SubscribeWindowTest, UpdateStartEnd) {
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index b62f5b4..6e08c2c 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -98,8 +98,7 @@
PublishedObject object;
switch (fetch_task_->GetNextObject(object)) {
case MoqtFetchTask::GetNextObjectResult::kSuccess:
- visitor_->OnObjectFragment(full_track_name(), object.sequence,
- object.publisher_priority, object.status,
+ visitor_->OnObjectFragment(full_track_name(), object.metadata,
object.payload.AsStringView(), true);
break;
case MoqtFetchTask::GetNextObjectResult::kError:
@@ -164,13 +163,14 @@
quiche::QuicheMemSlice message_slice(std::move(payload_));
output.payload = std::move(message_slice);
}
- output.sequence =
- Location(next_object_->group_id, next_object_->subgroup_id.value_or(0),
- next_object_->object_id);
- output.status = next_object_->object_status;
- output.publisher_priority = next_object_->publisher_priority;
+ output.metadata.location =
+ Location(next_object_->group_id, next_object_->object_id);
+ output.metadata.subgroup = next_object_->subgroup_id.value_or(0);
+ output.metadata.status = next_object_->object_status;
+ output.metadata.publisher_priority = next_object_->publisher_priority;
output.fin_after_this = false;
- if (output.sequence == largest_location_) { // This is the last object.
+ if (output.metadata.location ==
+ largest_location_) { // This is the last object.
eof_ = true;
}
next_object_.reset();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index d9abd82..74761f0 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -117,9 +117,7 @@
virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0;
// Called when an object fragment (or an entire object) is received.
virtual void OnObjectFragment(const FullTrackName& full_track_name,
- Location sequence,
- MoqtPriority publisher_priority,
- MoqtObjectStatus object_status,
+ const PublishedObjectMetadata& metadata,
absl::string_view object,
bool end_of_message) = 0;
virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 58f1282..5090f3d 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -179,7 +179,8 @@
got_object = true;
EXPECT_EQ(fetch_task_->GetNextObject(object),
MoqtFetchTask::GetNextObjectResult::kSuccess);
- EXPECT_EQ(object.sequence, Location(3, 0, 0));
+ EXPECT_EQ(object.metadata.location, Location(3, 0));
+ EXPECT_EQ(object.metadata.subgroup, 0);
EXPECT_EQ(object.payload.AsStringView(), "foobar");
});
int got_read_callback = 0;
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
deleted file mode 100644
index d9c2e1b..0000000
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc
+++ /dev/null
@@ -1,225 +0,0 @@
-// Copyright 2025 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "quiche/quic/moqt/test_tools/mock_moqt_session.h"
-
-#include <cstdint>
-#include <memory>
-#include <optional>
-#include <utility>
-
-#include "absl/status/status.h"
-#include "absl/status/statusor.h"
-#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_failed_fetch.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/platform/api/quiche_test.h"
-#include "quiche/web_transport/web_transport.h"
-
-namespace moqt::test {
-
-namespace {
-using ::testing::_;
-}
-
-// Object listener that forwards all of the objects to the
-// SubcribeRemoteTrack::Visitor provided.
-class MockMoqtSession::LoopbackObjectListener : public MoqtObjectListener {
- public:
- LoopbackObjectListener(FullTrackName name,
- SubscribeRemoteTrack::Visitor* visitor,
- std::shared_ptr<MoqtTrackPublisher> publisher,
- SubscribeWindow window)
- : name_(name),
- visitor_(visitor),
- publisher_(std::move(publisher)),
- window_(std::move(window)) {
- publisher_->AddObjectListener(this);
- }
- ~LoopbackObjectListener() { publisher_->RemoveObjectListener(this); }
-
- LoopbackObjectListener(const LoopbackObjectListener&) = delete;
- LoopbackObjectListener(LoopbackObjectListener&&) = delete;
- LoopbackObjectListener& operator=(const LoopbackObjectListener&) = delete;
- LoopbackObjectListener& operator=(LoopbackObjectListener&&) = delete;
-
- void OnSubscribeAccepted() override {
- visitor_->OnReply(name_,
- HasObjects()
- ? std::make_optional(publisher_->GetLargestLocation())
- : std::nullopt,
- std::nullopt);
- }
-
- void OnSubscribeRejected(MoqtSubscribeErrorReason reason,
- std::optional<uint64_t> track_alias) {
- visitor_->OnReply(name_, std::nullopt, reason.reason_phrase);
- }
-
- void OnNewObjectAvailable(Location sequence) {
- std::optional<PublishedObject> object =
- publisher_->GetCachedObject(sequence);
- if (!object.has_value()) {
- QUICHE_LOG(FATAL)
- << "GetCachedObject() returned nullopt for a sequence passed into "
- "OnNewObjectAvailable()";
- return;
- }
- if (!window_.InWindow(object->sequence)) {
- return;
- }
- visitor_->OnObjectFragment(name_, sequence, object->publisher_priority,
- object->status, object->payload.AsStringView(),
- /*end_of_message=*/true);
- }
-
- void OnNewFinAvailable(Location sequence) override {}
- void OnSubgroupAbandoned(Location sequence,
- webtransport::StreamErrorCode error_code) override {}
- void OnGroupAbandoned(uint64_t group_id) override {}
- void OnTrackPublisherGone() override { visitor_->OnSubscribeDone(name_); }
-
- private:
- bool HasObjects() {
- absl::StatusOr<MoqtTrackStatusCode> status = publisher_->GetTrackStatus();
- if (!status.ok()) {
- return false;
- }
- return *status == MoqtTrackStatusCode::kInProgress ||
- *status == MoqtTrackStatusCode::kFinished;
- }
-
- FullTrackName name_;
- SubscribeRemoteTrack::Visitor* visitor_;
- std::shared_ptr<MoqtTrackPublisher> publisher_;
- SubscribeWindow window_;
-};
-
-bool MockMoqtSession::Subscribe(const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- SubscribeWindow window) {
- auto track_publisher = publisher_->GetTrack(name);
- if (!track_publisher.ok()) {
- visitor->OnReply(name, std::nullopt, track_publisher.status().ToString());
- return false;
- }
- auto [it, inserted] = receiving_subscriptions_.insert(
- {name,
- std::make_unique<LoopbackObjectListener>(
- name, visitor, *std::move(track_publisher), std::move(window))});
- return inserted;
-}
-
-MockMoqtSession::MockMoqtSession(MoqtPublisher* publisher)
- : publisher_(publisher) {
- ON_CALL(*this, Error)
- .WillByDefault([](MoqtError code, absl::string_view error) {
- ADD_FAILURE() << "Unhandled MoQT fatal error, with code "
- << static_cast<int>(code) << " and message: " << error;
- });
- if (publisher_ != nullptr) {
- ON_CALL(*this, SubscribeCurrentObject)
- .WillByDefault([this](const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters) {
- return Subscribe(name, visitor, SubscribeWindow());
- });
- ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _))
- .WillByDefault([this](const FullTrackName& name, uint64_t start_group,
- uint64_t start_object,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters) {
- return Subscribe(
- name, visitor,
- SubscribeWindow(Location(start_group, start_object)));
- });
- ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _, _))
- .WillByDefault([this](const FullTrackName& name, uint64_t start_group,
- uint64_t start_object, uint64_t end_group,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters) {
- return Subscribe(
- name, visitor,
- SubscribeWindow(Location(start_group, start_object), end_group));
- });
- ON_CALL(*this, Unsubscribe)
- .WillByDefault([this](const FullTrackName& name) {
- receiving_subscriptions_.erase(name);
- });
- ON_CALL(*this, Fetch)
- .WillByDefault(
- [this](const FullTrackName& name, FetchResponseCallback callback,
- Location start, uint64_t end_group,
- std::optional<uint64_t> end_object, MoqtPriority priority,
- std::optional<MoqtDeliveryOrder> delivery_order,
- VersionSpecificParameters parameters) {
- auto track_publisher = publisher_->GetTrack(name);
- if (!track_publisher.ok()) {
- std::move(callback)(std::make_unique<MoqtFailedFetch>(
- track_publisher.status()));
- return true;
- }
- std::move(callback)(track_publisher->get()->Fetch(
- start, end_group, end_object,
- delivery_order.value_or(MoqtDeliveryOrder::kAscending)));
- return true;
- });
- ON_CALL(*this, JoiningFetch(_, _, _, _))
- .WillByDefault([this](const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- uint64_t num_previous_groups,
- VersionSpecificParameters parameters) {
- return JoiningFetch(
- name, visitor,
- [name, visitor](std::unique_ptr<MoqtFetchTask> fetch) {
- PublishedObject object;
- while (fetch->GetNextObject(object) ==
- MoqtFetchTask::kSuccess) {
- visitor->OnObjectFragment(
- name, object.sequence, object.publisher_priority,
- object.status, object.payload.AsStringView(), true);
- }
- },
- num_previous_groups, 0x80, std::nullopt, parameters);
- });
- ON_CALL(*this, JoiningFetch(_, _, _, _, _, _, _))
- .WillByDefault([this](const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- FetchResponseCallback callback,
- uint64_t num_previous_groups,
- MoqtPriority priority,
- std::optional<MoqtDeliveryOrder> delivery_order,
- VersionSpecificParameters parameters) {
- SubscribeCurrentObject(name, visitor, parameters);
- auto track_publisher = publisher_->GetTrack(name);
- if (!track_publisher.ok()) {
- std::move(callback)(
- std::make_unique<MoqtFailedFetch>(track_publisher.status()));
- return true;
- }
- if (track_publisher->get()->GetTrackStatus().value_or(
- MoqtTrackStatusCode::kStatusNotAvailable) ==
- MoqtTrackStatusCode::kNotYetBegun) {
- return Fetch(name, std::move(callback), Location(0, 0), 0, 0,
- priority, delivery_order, std::move(parameters));
- }
- Location largest = track_publisher->get()->GetLargestLocation();
- uint64_t start_group = largest.group >= num_previous_groups
- ? largest.group - num_previous_groups + 1
- : 0;
- return Fetch(name, std::move(callback), Location(start_group, 0),
- largest.group, largest.object, priority, delivery_order,
- std::move(parameters));
- });
- }
-}
-
-MockMoqtSession::~MockMoqtSession() = default;
-
-} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h
deleted file mode 100644
index b80ed7b..0000000
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.h
+++ /dev/null
@@ -1,108 +0,0 @@
-// Copyright 2025 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#ifndef QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_
-#define QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_
-
-#include <cstdint>
-#include <memory>
-#include <optional>
-
-#include "absl/container/flat_hash_map.h"
-#include "absl/strings/string_view.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_priority.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/quic/moqt/moqt_session_callbacks.h"
-#include "quiche/quic/moqt/moqt_session_interface.h"
-#include "quiche/quic/moqt/moqt_subscribe_windows.h"
-#include "quiche/quic/moqt/moqt_track.h"
-#include "quiche/common/platform/api/quiche_logging.h"
-#include "quiche/common/platform/api/quiche_test.h"
-
-namespace moqt::test {
-
-// Mock version of MoqtSession. If `publisher` is provided via constructor, all
-// of the SUBSCRIBE and FETCH requests are routed towards it.
-class MockMoqtSession : public MoqtSessionInterface {
- public:
- explicit MockMoqtSession(MoqtPublisher* publisher);
- ~MockMoqtSession() override;
-
- MockMoqtSession(const MockMoqtSession&) = delete;
- MockMoqtSession(MockMoqtSession&&) = delete;
- MockMoqtSession& operator=(const MockMoqtSession&) = delete;
- MockMoqtSession& operator=(MockMoqtSession&&) = delete;
-
- MoqtSessionCallbacks& callbacks() override { return callbacks_; }
-
- MOCK_METHOD(void, Error, (MoqtError code, absl::string_view error),
- (override));
- MOCK_METHOD(bool, SubscribeAbsolute,
- (const FullTrackName& name, uint64_t start_group,
- uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, SubscribeAbsolute,
- (const FullTrackName& name, uint64_t start_group,
- uint64_t start_object, uint64_t end_group,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, SubscribeCurrentObject,
- (const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, SubscribeNextGroup,
- (const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, SubscribeUpdate,
- (const FullTrackName& name, std::optional<Location> start,
- std::optional<uint64_t> end_group,
- std::optional<MoqtPriority> subscriber_priority,
- std::optional<bool> forward,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override));
- MOCK_METHOD(bool, Fetch,
- (const FullTrackName& name, FetchResponseCallback callback,
- Location start, uint64_t end_group,
- std::optional<uint64_t> end_object, MoqtPriority priority,
- std::optional<MoqtDeliveryOrder> delivery_order,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, JoiningFetch,
- (const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- uint64_t num_previous_groups,
- VersionSpecificParameters parameters),
- (override));
- MOCK_METHOD(bool, JoiningFetch,
- (const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- FetchResponseCallback callback, uint64_t num_previous_groups,
- MoqtPriority priority,
- std::optional<MoqtDeliveryOrder> delivery_order,
- VersionSpecificParameters parameters),
- (override));
-
- private:
- class LoopbackObjectListener;
-
- bool Subscribe(const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- SubscribeWindow window);
-
- MoqtPublisher* const publisher_ = nullptr;
- MoqtSessionCallbacks callbacks_;
- absl::flat_hash_map<FullTrackName, std::unique_ptr<LoopbackObjectListener>>
- receiving_subscriptions_;
-};
-
-} // namespace moqt::test
-
-#endif // QUICHE_QUIC_MOQT_TEST_TOOLS_MOCK_MOQT_SESSION_H_
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
deleted file mode 100644
index 1df0121..0000000
--- a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
+++ /dev/null
@@ -1,135 +0,0 @@
-// Copyright 2025 The Chromium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file.
-
-#include "quiche/quic/moqt/test_tools/mock_moqt_session.h"
-
-#include <memory>
-#include <optional>
-#include <utility>
-
-#include "quiche/quic/moqt/moqt_known_track_publisher.h"
-#include "quiche/quic/moqt/moqt_messages.h"
-#include "quiche/quic/moqt/moqt_outgoing_queue.h"
-#include "quiche/quic/moqt/moqt_publisher.h"
-#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
-#include "quiche/quic/test_tools/quic_test_utils.h"
-#include "quiche/common/platform/api/quiche_test.h"
-
-namespace moqt::test {
-namespace {
-
-using ::testing::_;
-using ::testing::Eq;
-using ::testing::HasSubstr;
-using ::testing::Optional;
-
-FullTrackName TrackName() { return FullTrackName("foo", "bar"); }
-
-class MockMoqtSessionTest : public quiche::test::QuicheTest {
- protected:
- MockMoqtSessionTest() : session_(&publisher_) {
- track_ = std::make_shared<MoqtOutgoingQueue>(
- TrackName(), MoqtForwardingPreference::kSubgroup);
- publisher_.Add(track_);
- }
-
- MoqtKnownTrackPublisher publisher_;
- std::shared_ptr<MoqtOutgoingQueue> track_;
- MockMoqtSession session_;
-};
-
-TEST_F(MockMoqtSessionTest, MissingTrack) {
- testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
- EXPECT_CALL(visitor,
- OnReply(FullTrackName("doesn't", "exist"), Eq(std::nullopt),
- Optional(HasSubstr("not found"))));
- session_.SubscribeCurrentObject(FullTrackName("doesn't", "exist"), &visitor,
- VersionSpecificParameters());
-}
-
-TEST_F(MockMoqtSessionTest, SubscribeCurrentObject) {
- testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
- EXPECT_CALL(visitor,
- OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
- session_.SubscribeCurrentObject(TrackName(), &visitor,
- VersionSpecificParameters());
- EXPECT_CALL(visitor,
- OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
- track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
-
- session_.Unsubscribe(TrackName());
- track_->AddObject(quic::test::MemSliceFromString("test2"), /*key=*/true);
- // No visitor call here.
-}
-
-TEST_F(MockMoqtSessionTest, SubscribeAbsolute) {
- testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
- EXPECT_CALL(visitor,
- OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
- session_.SubscribeAbsolute(TrackName(), 1, 0, 1, &visitor,
- VersionSpecificParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 0), _,
- MoqtObjectStatus::kNormal, "b", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 1), _,
- MoqtObjectStatus::kEndOfGroup, "", _));
- track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/true);
-}
-
-TEST_F(MockMoqtSessionTest, Fetch) {
- track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/false);
- track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/false);
- track_->AddObject(quic::test::MemSliceFromString("d"), /*key=*/true);
- std::unique_ptr<MoqtFetchTask> fetch;
- session_.Fetch(
- TrackName(),
- [&](std::unique_ptr<MoqtFetchTask> new_fetch) {
- fetch = std::move(new_fetch);
- },
- Location(0, 1), 0, 2, 0x80, std::nullopt, VersionSpecificParameters());
- PublishedObject object;
- ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess);
- EXPECT_EQ(object.payload.AsStringView(), "b");
- ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess);
- EXPECT_EQ(object.payload.AsStringView(), "c");
- ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kEof);
-}
-
-TEST_F(MockMoqtSessionTest, JoiningFetch) {
- track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("c"), /*key=*/true);
- track_->AddObject(quic::test::MemSliceFromString("d"), /*key=*/true);
-
- testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
- EXPECT_CALL(visitor,
- OnReply(TrackName(), Eq(Location(3, 0)), Eq(std::nullopt)));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 0), _,
- MoqtObjectStatus::kNormal, "c", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 1), _,
- MoqtObjectStatus::kEndOfGroup, "", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 0), _,
- MoqtObjectStatus::kNormal, "d", _));
- session_.JoiningFetch(TrackName(), &visitor, 2, VersionSpecificParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 1), _,
- MoqtObjectStatus::kEndOfGroup, "", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(4, 0), _,
- MoqtObjectStatus::kNormal, "e", _));
- track_->AddObject(quic::test::MemSliceFromString("e"), /*key=*/true);
-}
-
-TEST_F(MockMoqtSessionTest, JoiningFetchNoObjects) {
- testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
- EXPECT_CALL(visitor,
- OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
- session_.JoiningFetch(TrackName(), &visitor, 0, VersionSpecificParameters());
- EXPECT_CALL(visitor,
- OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
- track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
-}
-
-} // namespace
-} // namespace moqt::test
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index e3ddb7b..634fe12 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -20,6 +20,7 @@
#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
#include "quiche/web_transport/test_tools/mock_web_transport.h"
@@ -247,11 +248,10 @@
}
static bool SubgroupHasBeenReset(MoqtObjectListener* subscription,
- Location sequence) {
- sequence.object = 0;
+ DataStreamIndex index) {
return static_cast<MoqtSession::PublishedSubscription*>(subscription)
->reset_subgroups()
- .contains(sequence);
+ .contains(index);
}
};
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index f22ffda..26525da 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -25,6 +25,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
@@ -185,9 +186,9 @@
}
void ChatClient::RemoteTrackVisitor::OnObjectFragment(
- const FullTrackName& full_track_name, Location /*sequence*/,
- MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/,
- absl::string_view object, bool end_of_message) {
+ const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& /*metadata*/, absl::string_view object,
+ bool end_of_message) {
if (!end_of_message) {
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index 93d4ae5..5bad8e6 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -17,6 +17,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moqt_client.h"
@@ -98,9 +99,7 @@
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const moqt::FullTrackName& full_track_name,
- Location sequence,
- moqt::MoqtPriority publisher_priority,
- moqt::MoqtObjectStatus status,
+ const PublishedObjectMetadata& metadata,
absl::string_view object,
bool end_of_message) override;
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index c3d9b74..268c912 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -18,6 +18,7 @@
#include "quiche/quic/moqt/moqt_live_relay_queue.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/tools/moq_chat.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
@@ -149,9 +150,9 @@
}
void ChatServer::RemoteTrackVisitor::OnObjectFragment(
- const moqt::FullTrackName& full_track_name, moqt::Location sequence,
- moqt::MoqtPriority /*publisher_priority*/, moqt::MoqtObjectStatus status,
- absl::string_view object, bool end_of_message) {
+ const moqt::FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata, absl::string_view object,
+ bool end_of_message) {
if (!end_of_message) {
std::cerr << "Error: received partial message despite requesting "
"buffering\n";
@@ -162,14 +163,14 @@
<< full_track_name.ToString() << "\n";
return;
}
- if (status != MoqtObjectStatus::kNormal) {
- it->second->AddObject(sequence, status);
+ if (metadata.status != MoqtObjectStatus::kNormal) {
+ it->second->AddObject(metadata, "", /*fin=*/false);
return;
}
if (!server_->WriteToFile(GetUsername(full_track_name), object)) {
std::cout << GetUsername(full_track_name) << ": " << object << "\n\n";
}
- it->second->AddObject(sequence, object);
+ it->second->AddObject(metadata, object, /*fin=*/false);
}
ChatServer::ChatServer(std::unique_ptr<quic::ProofSource> proof_source,
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index 90e83e9..02fd059 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -43,9 +43,7 @@
std::optional<absl::string_view> reason_phrase) override;
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const moqt::FullTrackName& full_track_name,
- Location sequence,
- moqt::MoqtPriority /*publisher_priority*/,
- moqt::MoqtObjectStatus /*status*/,
+ const PublishedObjectMetadata& metadata,
absl::string_view object,
bool end_of_message) override;
void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index acc64a0..7e70c29 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -30,6 +30,7 @@
#include "absl/time/time.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/tools/moqt_server.h"
@@ -180,13 +181,12 @@
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const FullTrackName& full_track_name,
- Location sequence,
- MoqtPriority /*publisher_priority*/,
- MoqtObjectStatus /*status*/, absl::string_view object,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view object,
bool /*end_of_message*/) override {
std::string file_name =
- absl::StrCat(sequence.group, "-", sequence.object, ".",
- full_track_name.track_namespace().tuple().back());
+ absl::StrCat(metadata.location.group, "-", metadata.location.object,
+ ".", full_track_name.track_namespace().tuple().back());
std::string file_path = quiche::JoinPath(directory_, file_name);
std::ofstream output(file_path, std::ios::binary | std::ios::ate);
output.write(object.data(), object.size());
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index a5d4afb..b88fd56 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -10,7 +10,6 @@
#include <optional>
#include <utility>
#include <variant>
-#include <vector>
#include "absl/status/status.h"
#include "absl/status/statusor.h"
@@ -65,7 +64,7 @@
const FullTrackName& GetTrackName() const override { return track_name_; }
MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
- (Location sequence), (const, override));
+ (uint64_t, uint64_t, uint64_t), (const, override));
MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
(override));
MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
@@ -95,8 +94,8 @@
MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function),
(override));
MOCK_METHOD(void, OnObjectFragment,
- (const FullTrackName& full_track_name, Location sequence,
- MoqtPriority publisher_priority, MoqtObjectStatus status,
+ (const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
absl::string_view object, bool end_of_message),
(override));
MOCK_METHOD(void, OnSubscribeDone, (FullTrackName full_track_name),
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 3da0a9d..3bf12ee 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -37,6 +37,7 @@
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/moqt_track.h"
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -299,12 +300,12 @@
object_ack_function_ = std::move(ack_function);
}
- void OnObjectFragment(const FullTrackName& full_track_name, Location sequence,
- MoqtPriority /*publisher_priority*/,
- MoqtObjectStatus status, absl::string_view object,
+ void OnObjectFragment(const FullTrackName& full_track_name,
+ const PublishedObjectMetadata& metadata,
+ absl::string_view object,
bool end_of_message) override {
QUICHE_DCHECK(full_track_name == TrackName());
- if (status != MoqtObjectStatus::kNormal) {
+ if (metadata.status != MoqtObjectStatus::kNormal) {
QUICHE_DCHECK(end_of_message);
return;
}
@@ -312,7 +313,7 @@
QUICHE_LOG(DFATAL) << "Partial receiving of objects wasn't enabled";
return;
}
- OnFullObject(sequence, object);
+ OnFullObject(metadata.location, object);
}
void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}