Rename MoQT FullSequence to Location.
This is a trivial rename of `FullSequence` to `Location` to match the latest nomenclature in the draft.
PiperOrigin-RevId: 751414806
diff --git a/quiche/quic/moqt/moqt_cached_object.h b/quiche/quic/moqt/moqt_cached_object.h
index 2576f59..78807bc 100644
--- a/quiche/quic/moqt/moqt_cached_object.h
+++ b/quiche/quic/moqt/moqt_cached_object.h
@@ -18,7 +18,7 @@
// CachedObject is a version of PublishedObject with a reference counted
// payload.
struct CachedObject {
- FullSequence sequence;
+ Location sequence;
MoqtObjectStatus status;
MoqtPriority publisher_priority;
std::shared_ptr<quiche::QuicheMemSlice> payload;
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h
index 40e8d67..e825b82 100644
--- a/quiche/quic/moqt/moqt_failed_fetch.h
+++ b/quiche/quic/moqt/moqt_failed_fetch.h
@@ -24,7 +24,7 @@
absl::Status GetStatus() override { return status_; }
void SetObjectAvailableCallback(
ObjectsAvailableCallback /*callback*/) override {}
- FullSequence GetLargestId() const override { return FullSequence(); }
+ Location GetLargestId() const override { return Location(); }
private:
absl::Status status_;
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 05cc3dd..9895416 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -402,9 +402,9 @@
}
TEST_F(MoqtFramerSimpleTest, AllSubscribeInputs) {
- for (std::optional<FullSequence> start :
- {std::optional<FullSequence>(),
- std::optional<FullSequence>(std::in_place, 4, 0)}) {
+ for (std::optional<Location> start :
+ {std::optional<Location>(),
+ std::optional<Location>(std::in_place, 4, 0)}) {
for (std::optional<uint64_t> end_group :
{std::optional<uint64_t>(), std::optional<uint64_t>(7)}) {
MoqtSubscribe subscribe = {
@@ -442,7 +442,7 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/std::nullopt,
- /*start=*/FullSequence(4, 3),
+ /*start=*/Location(4, 3),
/*end_group=*/3,
MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt},
};
@@ -459,7 +459,7 @@
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*joining_fetch=*/std::nullopt,
/*full_track_name=*/FullTrackName{"foo", "bar"},
- /*start_object=*/FullSequence{1, 2},
+ /*start_object=*/Location{1, 2},
/*end_group=*/1,
/*end_object=*/1,
/*parameters=*/
@@ -479,7 +479,7 @@
TEST_F(MoqtFramerSimpleTest, SubscribeUpdateEndGroupOnly) {
MoqtSubscribeUpdate subscribe_update = {
/*subscribe_id=*/3,
- /*start=*/FullSequence(4, 3),
+ /*start=*/Location(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
@@ -495,7 +495,7 @@
TEST_F(MoqtFramerSimpleTest, SubscribeUpdateIncrementsEnd) {
MoqtSubscribeUpdate subscribe_update = {
/*subscribe_id=*/3,
- /*start=*/FullSequence(4, 3),
+ /*start=*/Location(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 3c8f001..96b36f3 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -74,7 +74,7 @@
void SubscribeLatestObject(FullTrackName track_name,
MockSubscribeRemoteTrackVisitor* visitor) {
bool received_ok = false;
- EXPECT_CALL(*visitor, OnReply(track_name, std::optional<FullSequence>(),
+ EXPECT_CALL(*visitor, OnReply(track_name, std::optional<Location>(),
std::optional<absl::string_view>()))
.WillOnce([&]() { received_ok = true; });
client_->session()->SubscribeCurrentObject(track_name, visitor,
@@ -277,7 +277,7 @@
queue->AddObject(MemSliceFromString("object data"), /*key=*/true);
bool received_object = false;
EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
+ .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
MoqtPriority /*publisher_priority*/,
MoqtObjectStatus status, absl::string_view object,
bool end_of_message) {
@@ -317,30 +317,29 @@
client_->session()->SubscribeCurrentObject(FullTrackName("test", name),
&client_visitor,
MoqtSubscribeParameters());
- std::optional<FullSequence> largest_id;
+ std::optional<Location> largest_id;
EXPECT_CALL(client_visitor, OnReply)
- .WillOnce([&](const FullTrackName& /*name*/,
- std::optional<FullSequence> id,
+ .WillOnce([&](const FullTrackName& /*name*/, std::optional<Location> id,
std::optional<absl::string_view> /*reason*/) {
largest_id = id;
});
bool success = test_harness_.RunUntilWithDefaultTimeout([&]() {
- return largest_id.has_value() && *largest_id == FullSequence(0, 2);
+ return largest_id.has_value() && *largest_id == Location(0, 2);
});
EXPECT_TRUE(success);
int received = 0;
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{0, 3}, _,
+ OnObjectFragment(_, Location{0, 3}, _,
MoqtObjectStatus::kEndOfGroup, "", true))
.WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{1, 0}, _,
+ OnObjectFragment(_, Location{1, 0}, _,
MoqtObjectStatus::kNormal, "object 4", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{1, 1}, _,
+ OnObjectFragment(_, Location{1, 1}, _,
MoqtObjectStatus::kNormal, "object 5", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
@@ -350,21 +349,21 @@
EXPECT_TRUE(success);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{1, 2}, _,
+ OnObjectFragment(_, Location{1, 2}, _,
MoqtObjectStatus::kNormal, "object 6", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{1, 3}, _,
+ OnObjectFragment(_, Location{1, 3}, _,
MoqtObjectStatus::kEndOfGroup, "", true))
.WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{2, 0}, _,
+ OnObjectFragment(_, Location{2, 0}, _,
MoqtObjectStatus::kNormal, "object 7", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{2, 1}, _,
+ OnObjectFragment(_, Location{2, 1}, _,
MoqtObjectStatus::kNormal, "object 8", true))
.WillOnce([&] { ++received; });
queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
@@ -374,11 +373,11 @@
EXPECT_TRUE(success);
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{2, 2}, _,
+ OnObjectFragment(_, Location{2, 2}, _,
MoqtObjectStatus::kEndOfGroup, "", true))
.WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
- OnObjectFragment(_, FullSequence{3, 0}, _,
+ OnObjectFragment(_, Location{3, 0}, _,
MoqtObjectStatus::kEndOfTrack, "", true))
.WillOnce([&] { ++received; });
queue->Close();
@@ -405,7 +404,7 @@
EXPECT_TRUE(client_->session()->Fetch(
full_track_name,
[&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); },
- FullSequence{0, 0}, 99, std::nullopt, 128, std::nullopt,
+ Location{0, 0}, 99, std::nullopt, 128, std::nullopt,
MoqtSubscribeParameters()));
// Run until we get FETCH_OK.
bool success = test_harness_.RunUntilWithDefaultTimeout(
@@ -413,10 +412,10 @@
EXPECT_TRUE(success);
EXPECT_TRUE(fetch->GetStatus().ok());
- EXPECT_EQ(fetch->GetLargestId(), FullSequence(99, 0));
+ EXPECT_EQ(fetch->GetLargestId(), Location(99, 0));
MoqtFetchTask::GetNextObjectResult result;
PublishedObject object;
- FullSequence expected{97, 0};
+ Location expected{97, 0};
do {
result = fetch->GetNextObject(object);
if (result == MoqtFetchTask::GetNextObjectResult::kEof) {
@@ -435,7 +434,7 @@
}
} while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
- EXPECT_EQ(expected, FullSequence(99, 1));
+ EXPECT_EQ(expected, Location(99, 1));
}
TEST_F(MoqtIntegrationTest, AnnounceFailure) {
@@ -567,9 +566,9 @@
SubscribeLatestObject(full_track_name, &client_visitor);
// Deliver 3 objects on 2 streams.
- queue->AddObject(FullSequence(0, 0), "object,0,0", false);
- queue->AddObject(FullSequence(0, 1), "object,0,1", true);
- queue->AddObject(FullSequence(1, 0), "object,1,0", true);
+ queue->AddObject(Location(0, 0), "object,0,0", false);
+ queue->AddObject(Location(0, 1), "object,0,1", true);
+ queue->AddObject(Location(1, 0), "object,1,0", true);
int received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment).WillRepeatedly([&]() {
++received;
@@ -624,7 +623,7 @@
listener->OnSubscribeAccepted();
});
EXPECT_CALL(client_visitor, OnReply(_, _, _))
- .WillOnce([&](const FullTrackName&, std::optional<FullSequence>,
+ .WillOnce([&](const FullTrackName&, std::optional<Location>,
std::optional<absl::string_view>) {
ack_function(10, 20, quic::QuicTimeDelta::FromMicroseconds(-123));
ack_function(100, 200, quic::QuicTimeDelta::FromMicroseconds(456));
@@ -680,18 +679,18 @@
size_t bytes_received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment)
.WillRepeatedly(
- [&](const FullTrackName&, FullSequence sequence,
+ [&](const FullTrackName&, Location sequence,
MoqtPriority /*publisher_priority*/, MoqtObjectStatus status,
absl::string_view object,
bool end_of_message) { bytes_received += object.size(); });
- queue->AddObject(FullSequence{0, 0, 0}, data, false);
- queue->AddObject(FullSequence{0, 0, 1}, data, false);
- queue->AddObject(FullSequence{0, 0, 2}, data, false);
- queue->AddObject(FullSequence{0, 0, 3}, data, true);
+ queue->AddObject(Location{0, 0, 0}, data, false);
+ queue->AddObject(Location{0, 0, 1}, data, false);
+ queue->AddObject(Location{0, 0, 2}, data, false);
+ queue->AddObject(Location{0, 0, 3}, data, true);
success = test_harness_.RunUntilWithDefaultTimeout([&]() {
return MoqtSessionPeer::SubgroupHasBeenReset(
MoqtSessionPeer::GetSubscription(server_->session(), 0),
- FullSequence{0, 0, 0});
+ Location{0, 0, 0});
});
EXPECT_TRUE(success);
// Stream was reset before all the bytes arrived.
@@ -730,16 +729,16 @@
size_t bytes_received = 0;
EXPECT_CALL(client_visitor, OnObjectFragment)
.WillRepeatedly(
- [&](const FullTrackName&, FullSequence sequence,
+ [&](const FullTrackName&, Location sequence,
MoqtPriority /*publisher_priority*/, MoqtObjectStatus status,
absl::string_view object,
bool end_of_message) { bytes_received += object.size(); });
- queue->AddObject(FullSequence{0, 0, 0}, data, false);
- queue->AddObject(FullSequence{1, 0, 0}, data, false);
+ queue->AddObject(Location{0, 0, 0}, data, false);
+ queue->AddObject(Location{1, 0, 0}, data, false);
success = test_harness_.RunUntilWithDefaultTimeout([&]() {
return MoqtSessionPeer::SubgroupHasBeenReset(
MoqtSessionPeer::GetSubscription(server_->session(), 0),
- FullSequence{0, 0, 0});
+ Location{0, 0, 0});
});
EXPECT_TRUE(success);
EXPECT_EQ(bytes_received, 2000);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index 69885e6..caab1e5 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -24,7 +24,7 @@
namespace moqt {
-bool MoqtLiveRelayQueue::AddFin(FullSequence sequence) {
+bool MoqtLiveRelayQueue::AddFin(Location sequence) {
switch (forwarding_preference_) {
case MoqtForwardingPreference::kDatagram:
return false;
@@ -59,7 +59,7 @@
}
bool MoqtLiveRelayQueue::OnStreamReset(
- FullSequence sequence, webtransport::StreamErrorCode error_code) {
+ Location sequence, webtransport::StreamErrorCode error_code) {
switch (forwarding_preference_) {
case MoqtForwardingPreference::kDatagram:
return false;
@@ -85,7 +85,7 @@
}
// TODO(martinduke): Unless Track Forwarding preference goes away, support it.
-bool MoqtLiveRelayQueue::AddRawObject(FullSequence sequence,
+bool MoqtLiveRelayQueue::AddRawObject(Location sequence,
MoqtObjectStatus status,
MoqtPriority priority,
absl::string_view payload, bool fin) {
@@ -176,7 +176,7 @@
}
// Object is valid. Update state.
if (next_sequence_ <= sequence) {
- next_sequence_ = FullSequence{sequence.group, sequence.object + 1};
+ next_sequence_ = Location{sequence.group, sequence.object + 1};
}
if (sequence.object >= group.next_object) {
group.next_object = sequence.object + 1;
@@ -212,7 +212,7 @@
}
std::optional<PublishedObject> MoqtLiveRelayQueue::GetCachedObject(
- FullSequence sequence) const {
+ Location sequence) const {
auto group_it = queue_.find(sequence.group);
if (group_it == queue_.end()) {
// Group does not exist.
@@ -238,9 +238,9 @@
return CachedObjectToPublishedObject(object_it->second);
}
-std::vector<FullSequence> MoqtLiveRelayQueue::GetCachedObjectsInRange(
- FullSequence start, FullSequence end) const {
- std::vector<FullSequence> sequences;
+std::vector<Location> MoqtLiveRelayQueue::GetCachedObjectsInRange(
+ Location start, Location end) const {
+ std::vector<Location> sequences;
SubscribeWindow window(start, end.group, end.object);
for (auto& group_it : queue_) {
if (group_it.first < start.group) {
@@ -275,8 +275,8 @@
return MoqtTrackStatusCode::kInProgress;
}
-FullSequence MoqtLiveRelayQueue::GetLargestSequence() const {
- return FullSequence{next_sequence_.group, next_sequence_.object - 1};
+Location MoqtLiveRelayQueue::GetLargestSequence() const {
+ return Location{next_sequence_.group, next_sequence_.object - 1};
}
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index cea9470..3a5faf0 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -58,11 +58,10 @@
// occur. A false return value might result in a session error on the
// inbound session, but this queue is the only place that retains enough state
// to check.
- bool AddObject(FullSequence sequence, MoqtObjectStatus status,
- bool fin = false) {
+ bool AddObject(Location sequence, MoqtObjectStatus status, bool fin = false) {
return AddRawObject(sequence, status, publisher_priority_, "", fin);
}
- bool AddObject(FullSequence sequence, absl::string_view object,
+ bool AddObject(Location sequence, absl::string_view object,
bool fin = false) {
return AddRawObject(sequence, MoqtObjectStatus::kNormal,
publisher_priority_, object, fin);
@@ -72,19 +71,19 @@
// Otherwise, |sequence| is used to determine which stream is being FINed. If
// the object ID does not match the last object ID in the stream, no action
// is taken.
- bool AddFin(FullSequence sequence);
+ bool AddFin(Location sequence);
// Record a received RESET_STREAM. |sequence| encodes the group and subgroup
// of the stream that is being reset. Returns false on datagram tracks, or if
// the stream does not exist.
- bool OnStreamReset(FullSequence sequence,
+ bool OnStreamReset(Location sequence,
webtransport::StreamErrorCode error_code);
// MoqtTrackPublisher implementation.
const FullTrackName& GetTrackName() const override { return track_; }
std::optional<PublishedObject> GetCachedObject(
- FullSequence sequence) const override;
- std::vector<FullSequence> GetCachedObjectsInRange(
- FullSequence start, FullSequence end) const override;
+ Location sequence) const override;
+ std::vector<Location> GetCachedObjectsInRange(Location start,
+ Location end) const override;
void AddObjectListener(MoqtObjectListener* listener) override {
listeners_.insert(listener);
listener->OnSubscribeAccepted();
@@ -93,7 +92,7 @@
listeners_.erase(listener);
}
absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const override;
- FullSequence GetLargestSequence() const override;
+ Location GetLargestSequence() const override;
MoqtForwardingPreference GetForwardingPreference() const override {
return forwarding_preference_;
}
@@ -103,7 +102,7 @@
MoqtDeliveryOrder GetDeliveryOrder() const override {
return delivery_order_;
}
- std::unique_ptr<MoqtFetchTask> Fetch(FullSequence /*start*/,
+ std::unique_ptr<MoqtFetchTask> Fetch(Location /*start*/,
uint64_t /*end_group*/,
std::optional<uint64_t> /*end_object*/,
MoqtDeliveryOrder /*order*/) override {
@@ -135,7 +134,7 @@
absl::btree_map<SubgroupPriority, Subgroup> subgroups;
};
- bool AddRawObject(FullSequence sequence, MoqtObjectStatus status,
+ bool AddRawObject(Location sequence, MoqtObjectStatus status,
MoqtPriority priority, absl::string_view payload, bool fin);
const quic::QuicClock* clock_;
@@ -145,8 +144,8 @@
MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending;
absl::btree_map<uint64_t, Group> queue_; // Ordered by group id.
absl::flat_hash_set<MoqtObjectListener*> listeners_;
- std::optional<FullSequence> end_of_track_;
- FullSequence next_sequence_;
+ std::optional<Location> end_of_track_;
+ Location next_sequence_;
};
} // namespace moqt
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index d313099..d5ed3ff 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -29,7 +29,7 @@
AddObjectListener(this);
}
- void OnNewObjectAvailable(FullSequence sequence) {
+ void OnNewObjectAvailable(Location sequence) {
std::optional<PublishedObject> object = GetCachedObject(sequence);
QUICHE_CHECK(object.has_value());
if (!object.has_value()) {
@@ -65,19 +65,18 @@
}
void GetObjectsFromPast(const SubscribeWindow& window) {
- std::vector<FullSequence> objects =
- GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
- for (FullSequence object : objects) {
+ std::vector<Location> objects =
+ GetCachedObjectsInRange(Location(0, 0), GetLargestSequence());
+ for (Location object : objects) {
if (window.InWindow(object)) {
OnNewObjectAvailable(object);
}
}
}
- MOCK_METHOD(void, OnNewFinAvailable, (FullSequence sequence));
+ MOCK_METHOD(void, OnNewFinAvailable, (Location sequence));
MOCK_METHOD(void, OnSubgroupAbandoned,
- (FullSequence sequence,
- webtransport::StreamErrorCode error_code));
+ (Location sequence, webtransport::StreamErrorCode error_code));
MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id));
MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
MOCK_METHOD(void, CloseStreamForSubgroup,
@@ -107,11 +106,10 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseStreamForGroup(0));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
}
TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromZero) {
@@ -126,9 +124,9 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -143,10 +141,10 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
TEST(MoqtLiveRelayQueue, TwoGroups) {
@@ -161,14 +159,13 @@
EXPECT_CALL(queue, PublishObject(1, 1, "e"));
EXPECT_CALL(queue, PublishObject(1, 2, "f"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "d"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 2}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f"));
}
TEST(MoqtLiveRelayQueue, TwoGroupsPastSubscribe) {
@@ -190,15 +187,14 @@
EXPECT_CALL(queue, PublishObject(1, 1, "e"));
EXPECT_CALL(queue, PublishObject(1, 2, "f"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "d"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 2}, "f"));
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, "f"));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
TEST(MoqtLiveRelayQueue, FiveGroups) {
@@ -223,24 +219,20 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
}
TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribe) {
@@ -274,24 +266,20 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -315,25 +303,21 @@
EXPECT_CALL(queue, PublishObject(4, 0, "i"));
EXPECT_CALL(queue, PublishObject(4, 1, "j"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "c"));
- EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "d"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{1, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 0}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{2, 1}, "f"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{2, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 0}, "g"));
- EXPECT_TRUE(queue.AddObject(FullSequence{3, 1}, "h"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i"));
- EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 1}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{1, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{2, 0}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 1}, "f"));
+ EXPECT_TRUE(queue.AddObject(Location{2, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{3, 0}, "g"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 1}, "h"));
+ EXPECT_TRUE(queue.AddObject(Location{3, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{4, 0}, "i"));
+ EXPECT_TRUE(queue.AddObject(Location{4, 1}, "j"));
// This object will be ignored, but this is not an error.
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 2}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, MoqtObjectStatus::kEndOfGroup));
}
TEST(MoqtLiveRelayQueue, EndOfTrackAndGroup) {
@@ -344,12 +328,12 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseTrack());
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_FALSE(queue.AddObject(FullSequence{0, 1},
- MoqtObjectStatus::kEndOfTrackAndGroup));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 3},
- MoqtObjectStatus::kEndOfTrackAndGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_FALSE(
+ queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfTrackAndGroup));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrackAndGroup));
}
TEST(MoqtLiveRelayQueue, EndOfTrack) {
@@ -360,12 +344,10 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseTrack());
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_FALSE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfTrack));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{1, 0}, MoqtObjectStatus::kEndOfTrack));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_FALSE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfTrack));
+ EXPECT_TRUE(queue.AddObject(Location{1, 0}, MoqtObjectStatus::kEndOfTrack));
}
TEST(MoqtLiveRelayQueue, EndOfGroup) {
@@ -376,13 +358,11 @@
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
EXPECT_CALL(queue, CloseStreamForGroup(0));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_FALSE(
- queue.AddObject(FullSequence{0, 1}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(FullSequence{0, 4}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_FALSE(queue.AddObject(Location{0, 1}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_FALSE(queue.AddObject(Location{0, 4}, "e"));
}
TEST(MoqtLiveRelayQueue, GroupDoesNotExist) {
@@ -391,10 +371,10 @@
testing::InSequence seq;
EXPECT_CALL(queue, SkipGroup(0));
}
- EXPECT_FALSE(queue.AddObject(FullSequence{0, 1},
- MoqtObjectStatus::kGroupDoesNotExist));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0},
- MoqtObjectStatus::kGroupDoesNotExist));
+ EXPECT_FALSE(
+ queue.AddObject(Location{0, 1}, MoqtObjectStatus::kGroupDoesNotExist));
+ EXPECT_TRUE(
+ queue.AddObject(Location{0, 0}, MoqtObjectStatus::kGroupDoesNotExist));
}
TEST(MoqtLiveRelayQueue, OverwriteObject) {
@@ -405,12 +385,11 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- EXPECT_TRUE(
- queue.AddObject(FullSequence{0, 3}, MoqtObjectStatus::kEndOfGroup));
- EXPECT_FALSE(queue.AddObject(FullSequence{0, 1}, "invalid"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2}, "c"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 3}, MoqtObjectStatus::kEndOfGroup));
+ EXPECT_FALSE(queue.AddObject(Location{0, 1}, "invalid"));
}
TEST(MoqtLiveRelayQueue, DifferentSubgroups) {
@@ -421,11 +400,11 @@
EXPECT_CALL(queue, PublishObject(0, 1, "b"));
EXPECT_CALL(queue, PublishObject(0, 3, "d"));
EXPECT_CALL(queue, PublishObject(0, 2, "c"));
- EXPECT_CALL(queue, OnNewFinAvailable(FullSequence{0, 0, 3}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 3}));
EXPECT_CALL(queue, PublishObject(0, 5, "e"));
EXPECT_CALL(queue, PublishObject(0, 7, "f"));
- EXPECT_CALL(queue, OnNewFinAvailable(FullSequence{0, 1, 5}));
- EXPECT_CALL(queue, OnNewFinAvailable(FullSequence{0, 2, 7}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 1, 5}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 2, 7}));
// Serve them back in strict subgroup order.
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
@@ -438,15 +417,15 @@
EXPECT_CALL(queue, PublishObject(0, 7, "f"));
EXPECT_CALL(queue, CloseStreamForSubgroup(0, 2));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1, 1}, "b"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 3}, "d"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2, 2}, "c"));
- EXPECT_TRUE(queue.AddFin(FullSequence{0, 0, 3}));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 1, 5}, "e"));
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 2, 7}, "f"));
- EXPECT_TRUE(queue.AddFin(FullSequence{0, 1, 5}));
- EXPECT_TRUE(queue.AddFin(FullSequence{0, 2, 7}));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1, 1}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 3}, "d"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2, 2}, "c"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 0, 3}));
+ EXPECT_TRUE(queue.AddObject(Location{0, 1, 5}, "e"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 2, 7}, "f"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 1, 5}));
+ EXPECT_TRUE(queue.AddFin(Location{0, 2, 7}));
queue.GetObjectsFromPast(SubscribeWindow());
}
@@ -455,12 +434,12 @@
{
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnNewFinAvailable(FullSequence{0, 0, 0}));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0, 0}));
EXPECT_CALL(queue, PublishObject(0, 2, "b")).Times(0);
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.AddFin(FullSequence{0, 0, 0}));
- EXPECT_FALSE(queue.AddObject(FullSequence{0, 0, 2}, "b"));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
+ EXPECT_TRUE(queue.AddFin(Location{0, 0, 0}));
+ EXPECT_FALSE(queue.AddObject(Location{0, 0, 2}, "b"));
}
TEST(MoqtLiveRelayQueue, AddObjectWithFin) {
@@ -469,9 +448,8 @@
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a", true));
- std::optional<PublishedObject> object =
- queue.GetCachedObject(FullSequence{0, 0});
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", true));
+ std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
ASSERT_TRUE(object.has_value());
EXPECT_EQ(object->status, MoqtObjectStatus::kNormal);
EXPECT_TRUE(object->fin_after_this);
@@ -483,11 +461,10 @@
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a", false));
- EXPECT_CALL(queue, OnNewFinAvailable(FullSequence{0, 0}));
- EXPECT_TRUE(queue.AddFin(FullSequence{0, 0}));
- std::optional<PublishedObject> object =
- queue.GetCachedObject(FullSequence{0, 0});
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a", false));
+ EXPECT_CALL(queue, OnNewFinAvailable(Location{0, 0}));
+ EXPECT_TRUE(queue.AddFin(Location{0, 0}));
+ std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
ASSERT_TRUE(object.has_value());
EXPECT_EQ(object->status, MoqtObjectStatus::kNormal);
EXPECT_TRUE(object->fin_after_this);
@@ -498,10 +475,10 @@
{
testing::InSequence seq;
EXPECT_CALL(queue, PublishObject(0, 0, "a"));
- EXPECT_CALL(queue, OnSubgroupAbandoned(FullSequence{0, 0}, 0x1));
+ EXPECT_CALL(queue, OnSubgroupAbandoned(Location{0, 0}, 0x1));
}
- EXPECT_TRUE(queue.AddObject(FullSequence{0, 0, 0}, "a"));
- EXPECT_TRUE(queue.OnStreamReset(FullSequence{0, 0}, 0x1));
+ EXPECT_TRUE(queue.AddObject(Location{0, 0, 0}, "a"));
+ EXPECT_TRUE(queue.OnStreamReset(Location{0, 0}, 0x1));
}
} // namespace
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 084b96f..cbcd4d6 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -242,44 +242,41 @@
};
// These are absolute sequence numbers.
-struct FullSequence {
+struct Location {
uint64_t group;
uint64_t subgroup;
uint64_t object;
- FullSequence() : FullSequence(0, 0) {}
+ Location() : Location(0, 0) {}
// There is a lot of code from before subgroups. Assume there's one subgroup
// with ID 0 per group.
- FullSequence(uint64_t group, uint64_t object)
- : FullSequence(group, 0, object) {}
- FullSequence(uint64_t group, uint64_t subgroup, uint64_t object)
+ Location(uint64_t group, uint64_t object) : Location(group, 0, object) {}
+ Location(uint64_t group, uint64_t subgroup, uint64_t object)
: group(group), subgroup(subgroup), object(object) {}
- bool operator==(const FullSequence& other) const {
+ bool operator==(const Location& other) const {
return group == other.group && object == other.object;
}
// These are temporal ordering comparisons, so subgroup ID doesn't matter.
- bool operator<(const FullSequence& other) const {
+ bool operator<(const Location& other) const {
return group < other.group ||
(group == other.group && object < other.object);
}
- bool operator<=(const FullSequence& other) const {
+ bool operator<=(const Location& other) const {
return (group < other.group ||
(group == other.group && object <= other.object));
}
- bool operator>(const FullSequence& other) const { return !(*this <= other); }
- FullSequence& operator=(FullSequence other) {
+ bool operator>(const Location& other) const { return !(*this <= other); }
+ Location& operator=(Location other) {
group = other.group;
subgroup = other.subgroup;
object = other.object;
return *this;
}
- FullSequence next() const {
- return FullSequence{group, subgroup, object + 1};
- }
+ Location next() const { return Location{group, subgroup, object + 1}; }
template <typename H>
- friend H AbslHashValue(H h, const FullSequence& m);
+ friend H AbslHashValue(H h, const Location& m);
template <typename Sink>
- friend void AbslStringify(Sink& sink, const FullSequence& sequence) {
+ friend void AbslStringify(Sink& sink, const Location& sequence) {
absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object);
}
};
@@ -305,7 +302,7 @@
};
template <typename H>
-H AbslHashValue(H h, const FullSequence& m) {
+H AbslHashValue(H h, const Location& m) {
return H::combine(std::move(h), m.group, m.object);
}
@@ -391,7 +388,7 @@
// start: kAbsoluteStart
// start, end_group: kAbsoluteRange (request whole last group)
// All other combinations are invalid.
- std::optional<FullSequence> start;
+ std::optional<Location> start;
std::optional<uint64_t> end_group;
// If the mode is kNone, the these are std::nullopt.
@@ -408,7 +405,7 @@
quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
MoqtDeliveryOrder group_order;
// If ContextExists on the wire is zero, largest_id has no value.
- std::optional<FullSequence> largest_id;
+ std::optional<Location> largest_id;
MoqtSubscribeParameters parameters;
};
@@ -442,7 +439,7 @@
struct QUICHE_EXPORT MoqtSubscribeUpdate {
uint64_t subscribe_id;
- FullSequence start;
+ Location start;
std::optional<uint64_t> end_group;
MoqtPriority subscriber_priority;
MoqtSubscribeParameters parameters;
@@ -553,7 +550,7 @@
// and ranges. The session will populate them instead.
std::optional<JoiningFetch> joining_fetch;
FullTrackName full_track_name;
- FullSequence start_object; // subgroup is ignored
+ Location start_object; // subgroup is ignored
uint64_t end_group;
std::optional<uint64_t> end_object;
MoqtSubscribeParameters parameters;
@@ -566,7 +563,7 @@
struct QUICHE_EXPORT MoqtFetchOk {
uint64_t subscribe_id;
MoqtDeliveryOrder group_order;
- FullSequence largest_id; // subgroup is ignored
+ Location largest_id; // subgroup is ignored
MoqtSubscribeParameters parameters;
};
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index d6ba015..0b57ca4 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -62,7 +62,7 @@
void MoqtOutgoingQueue::AddRawObject(MoqtObjectStatus status,
quiche::QuicheMemSlice payload) {
- FullSequence sequence{current_group_id_, queue_.back().size()};
+ Location sequence{current_group_id_, queue_.back().size()};
bool fin = forwarding_preference_ == MoqtForwardingPreference::kSubgroup &&
status == MoqtObjectStatus::kEndOfGroup;
queue_.back().push_back(
@@ -75,9 +75,9 @@
}
std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
- FullSequence sequence) const {
+ Location sequence) const {
if (sequence.group < first_group_in_queue()) {
- return PublishedObject{FullSequence{sequence.group, sequence.object},
+ return PublishedObject{Location{sequence.group, sequence.object},
MoqtObjectStatus::kGroupDoesNotExist,
publisher_priority_, quiche::QuicheMemSlice(),
clock_->ApproximateNow()};
@@ -94,9 +94,9 @@
return CachedObjectToPublishedObject(group[sequence.object]);
}
-std::vector<FullSequence> MoqtOutgoingQueue::GetCachedObjectsInRange(
- FullSequence start, FullSequence end) const {
- std::vector<FullSequence> sequences;
+std::vector<Location> MoqtOutgoingQueue::GetCachedObjectsInRange(
+ Location start, Location end) const {
+ std::vector<Location> sequences;
SubscribeWindow window(start, end.group, end.object);
for (const Group& group : queue_) {
for (const CachedObject& object : group) {
@@ -118,28 +118,28 @@
return MoqtTrackStatusCode::kInProgress;
}
-FullSequence MoqtOutgoingQueue::GetLargestSequence() const {
+Location MoqtOutgoingQueue::GetLargestSequence() const {
if (queue_.empty()) {
QUICHE_BUG(MoqtOutgoingQueue_GetLargestSequence_not_begun)
<< "Calling GetLargestSequence() on a track that hasn't begun";
- return FullSequence{0, 0};
+ return Location{0, 0};
}
- return FullSequence{current_group_id_, queue_.back().size() - 1};
+ return Location{current_group_id_, queue_.back().size() - 1};
}
std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::Fetch(
- FullSequence start, uint64_t end_group, std::optional<uint64_t> end_object,
+ Location start, uint64_t end_group, std::optional<uint64_t> end_object,
MoqtDeliveryOrder order) {
if (queue_.empty()) {
return std::make_unique<MoqtFailedFetch>(
absl::NotFoundError("No objects available on the track"));
}
- FullSequence end = FullSequence(
+ Location end = Location(
end_group, end_object.value_or(std::numeric_limits<uint64_t>::max()));
- FullSequence first_available_object = FullSequence(first_group_in_queue(), 0);
- FullSequence last_available_object =
- FullSequence(current_group_id_, queue_.back().size() - 1);
+ Location first_available_object = Location(first_group_in_queue(), 0);
+ Location last_available_object =
+ Location(current_group_id_, queue_.back().size() - 1);
if (end < first_available_object) {
return std::make_unique<MoqtFailedFetch>(
@@ -150,9 +150,9 @@
absl::NotFoundError("All of the requested objects are in the future"));
}
- FullSequence adjusted_start = std::max(start, first_available_object);
- FullSequence adjusted_end = std::min(end, last_available_object);
- std::vector<FullSequence> objects =
+ Location adjusted_start = std::max(start, first_available_object);
+ Location adjusted_end = std::min(end, last_available_object);
+ std::vector<Location> objects =
GetCachedObjectsInRange(adjusted_start, adjusted_end);
if (order == MoqtDeliveryOrder::kDescending) {
absl::c_reverse(objects);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 4687ac8..3d3c7f0 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -55,9 +55,9 @@
// MoqtTrackPublisher implementation.
const FullTrackName& GetTrackName() const override { return track_; }
std::optional<PublishedObject> GetCachedObject(
- FullSequence sequence) const override;
- std::vector<FullSequence> GetCachedObjectsInRange(
- FullSequence start, FullSequence end) const override;
+ Location sequence) const override;
+ std::vector<Location> GetCachedObjectsInRange(Location start,
+ Location end) const override;
void AddObjectListener(MoqtObjectListener* listener) override {
listeners_.insert(listener);
listener->OnSubscribeAccepted();
@@ -66,7 +66,7 @@
listeners_.erase(listener);
}
absl::StatusOr<MoqtTrackStatusCode> GetTrackStatus() const override;
- FullSequence GetLargestSequence() const override;
+ Location GetLargestSequence() const override;
MoqtForwardingPreference GetForwardingPreference() const override {
return forwarding_preference_;
}
@@ -76,7 +76,7 @@
MoqtDeliveryOrder GetDeliveryOrder() const override {
return delivery_order_;
}
- std::unique_ptr<MoqtFetchTask> Fetch(FullSequence start, uint64_t end_group,
+ std::unique_ptr<MoqtFetchTask> Fetch(Location start, uint64_t end_group,
std::optional<uint64_t> end_object,
MoqtDeliveryOrder order) override;
@@ -105,12 +105,12 @@
// Fetch task for a fetch from the cache.
class FetchTask : public MoqtFetchTask {
public:
- FetchTask(MoqtOutgoingQueue* queue, std::vector<FullSequence> objects)
+ FetchTask(MoqtOutgoingQueue* queue, std::vector<Location> objects)
: queue_(queue), objects_(objects.begin(), objects.end()) {}
GetNextObjectResult GetNextObject(PublishedObject&) override;
absl::Status GetStatus() override { return status_; }
- FullSequence GetLargestId() const override { return objects_.back(); }
+ Location GetLargestId() const override { return objects_.back(); }
void SetObjectAvailableCallback(
ObjectsAvailableCallback /*callback*/) override {
@@ -122,7 +122,7 @@
GetNextObjectResult GetNextObjectInner(PublishedObject&);
MoqtOutgoingQueue* queue_;
- quiche::QuicheCircularDeque<FullSequence> objects_;
+ quiche::QuicheCircularDeque<Location> objects_;
absl::Status status_ = absl::OkStatus();
};
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 92d0d2e..028be69 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -46,7 +46,7 @@
AddObjectListener(this);
}
- void OnNewObjectAvailable(FullSequence sequence) override {
+ void OnNewObjectAvailable(Location sequence) override {
std::optional<PublishedObject> object = GetCachedObject(sequence);
QUICHE_CHECK(object.has_value());
ASSERT_THAT(object->status, AnyOf(MoqtObjectStatus::kNormal,
@@ -60,19 +60,18 @@
}
void GetObjectsFromPast(const SubscribeWindow& window) {
- std::vector<FullSequence> objects =
- GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
- for (FullSequence object : objects) {
+ std::vector<Location> objects =
+ GetCachedObjectsInRange(Location(0, 0), GetLargestSequence());
+ for (Location object : objects) {
if (window.InWindow(object)) {
OnNewObjectAvailable(object);
}
}
}
- MOCK_METHOD(void, OnNewFinAvailable, (FullSequence sequence));
+ MOCK_METHOD(void, OnNewFinAvailable, (Location sequence));
MOCK_METHOD(void, OnSubgroupAbandoned,
- (FullSequence sequence,
- webtransport::StreamErrorCode error_code));
+ (Location sequence, webtransport::StreamErrorCode error_code));
MOCK_METHOD(void, OnGroupAbandoned, (uint64_t group_id));
MOCK_METHOD(void, CloseStreamForGroup, (uint64_t group_id), ());
MOCK_METHOD(void, PublishObject,
@@ -146,7 +145,7 @@
queue.AddObject(MemSliceFromString("a"), true);
queue.AddObject(MemSliceFromString("b"), false);
queue.AddObject(MemSliceFromString("c"), false);
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 0)));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0)));
}
TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
@@ -163,7 +162,7 @@
queue.AddObject(MemSliceFromString("a"), true);
queue.AddObject(MemSliceFromString("b"), false);
queue.AddObject(MemSliceFromString("c"), false);
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
TEST(MoqtOutgoingQueue, TwoGroups) {
@@ -211,7 +210,7 @@
queue.AddObject(MemSliceFromString("d"), true);
queue.AddObject(MemSliceFromString("e"), false);
queue.AddObject(MemSliceFromString("f"), false);
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 1)));
}
TEST(MoqtOutgoingQueue, FiveGroups) {
@@ -284,12 +283,12 @@
queue.AddObject(MemSliceFromString("h"), false);
queue.AddObject(MemSliceFromString("i"), true);
queue.AddObject(MemSliceFromString("j"), false);
- queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 0)));
+ queue.GetObjectsFromPast(SubscribeWindow(Location(0, 0)));
}
TEST(MoqtOutgoingQueue, Fetch) {
TestMoqtOutgoingQueue queue;
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 2, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0,
MoqtDeliveryOrder::kAscending)),
StatusIs(absl::StatusCode::kNotFound));
@@ -299,40 +298,40 @@
queue.AddObject(MemSliceFromString("d"), false);
queue.AddObject(MemSliceFromString("e"), true);
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 2, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("a", "b", "c", "d", "e")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 100}, 0, 1000,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 100}, 0, 1000,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(IsEmpty()));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 2, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0,
MoqtDeliveryOrder::kDescending)),
IsOkAndHolds(ElementsAre("e", "c", "d", "a", "b")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 1, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 1, 0,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("a", "b", "c")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 1, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 1, 0,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("a", "b", "c")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{1, 0}, 5, std::nullopt,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{1, 0}, 5, std::nullopt,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("c", "d", "e")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{3, 0}, 5, std::nullopt,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{3, 0}, 5, std::nullopt,
MoqtDeliveryOrder::kAscending)),
StatusIs(absl::StatusCode::kNotFound));
queue.AddObject(MemSliceFromString("f"), true);
queue.AddObject(MemSliceFromString("g"), false);
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 0, 1,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 0, 1,
MoqtDeliveryOrder::kAscending)),
StatusIs(absl::StatusCode::kNotFound));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 2, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 2, 0,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("c", "d", "e")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{1, 0}, 5, std::nullopt,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{1, 0}, 5, std::nullopt,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("c", "d", "e", "f", "g")));
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{3, 0}, 5, std::nullopt,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{3, 0}, 5, std::nullopt,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("f", "g")));
}
@@ -345,11 +344,11 @@
queue.AddObject(MemSliceFromString("d"), true);
queue.AddObject(MemSliceFromString("e"), true);
- EXPECT_THAT(FetchToVector(queue.Fetch(FullSequence{0, 0}, 5, 0,
+ EXPECT_THAT(FetchToVector(queue.Fetch(Location{0, 0}, 5, 0,
MoqtDeliveryOrder::kAscending)),
IsOkAndHolds(ElementsAre("c", "d", "e")));
std::unique_ptr<MoqtFetchTask> deferred_fetch =
- queue.Fetch(FullSequence{0, 0}, 5, 0, MoqtDeliveryOrder::kAscending);
+ queue.Fetch(Location{0, 0}, 5, 0, MoqtDeliveryOrder::kAscending);
queue.AddObject(MemSliceFromString("f"), true);
queue.AddObject(MemSliceFromString("g"), true);
@@ -365,8 +364,7 @@
quic::QuicTime test_start = clock->ApproximateNow();
TestMoqtOutgoingQueue queue;
queue.AddObject(MemSliceFromString("a"), true);
- std::optional<PublishedObject> object =
- queue.GetCachedObject(FullSequence{0, 0});
+ std::optional<PublishedObject> object = queue.GetCachedObject(Location{0, 0});
ASSERT_TRUE(object.has_value());
EXPECT_GE(object->arrival_time, test_start);
}
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 5282470..1dbe07e 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -423,7 +423,7 @@
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
- subscribe_request.start = FullSequence(group, object);
+ subscribe_request.start = Location(group, object);
if (filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
@@ -468,7 +468,7 @@
subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
if (content_exists) {
- subscribe_ok.largest_id = FullSequence();
+ subscribe_ok.largest_id = Location();
if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
!reader.ReadVarInt62(&subscribe_ok.largest_id->object)) {
return 0;
@@ -534,7 +534,7 @@
if (!ReadSubscribeParameters(reader, subscribe_update.parameters)) {
return 0;
}
- subscribe_update.start = FullSequence(start_group, start_object);
+ subscribe_update.start = Location(start_group, start_object);
if (end_group > 0) {
subscribe_update.end_group = end_group - 1;
if (subscribe_update.end_group < start_group) {
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index c90c52a..b70dfc9 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -24,7 +24,7 @@
// PublishedObject is a description of an object that is sufficient to publish
// it on a given track.
struct PublishedObject {
- FullSequence sequence;
+ Location sequence;
MoqtObjectStatus status;
MoqtPriority publisher_priority;
quiche::QuicheMemSlice payload;
@@ -51,15 +51,15 @@
// Notifies that an object with the given sequence number has become
// available. The object payload itself may be retrieved via GetCachedObject
// method of the associated track publisher.
- virtual void OnNewObjectAvailable(FullSequence sequence) = 0;
+ virtual void OnNewObjectAvailable(Location sequence) = 0;
// Notifies that a pure FIN has arrived following |sequence|. Should not be
// called unless all objects have already been delivered. If not delivered,
// instead set the fin_after_this flag in the PublishedObject.
- virtual void OnNewFinAvailable(FullSequence sequence) = 0;
+ virtual void OnNewFinAvailable(Location sequence) = 0;
// Notifies that the a stream is being abandoned (via RESET_STREAM) before
// all objects are delivered.
virtual void OnSubgroupAbandoned(
- FullSequence sequence, webtransport::StreamErrorCode error_code) = 0;
+ Location sequence, webtransport::StreamErrorCode error_code) = 0;
// No further object will be published for the given group, usually due to a
// timeout. The owner of the Listener may want to reset the relevant streams.
@@ -107,7 +107,7 @@
// Returns the highest sequence number that will be delivered by the fetch.
// It is the minimum of the end of the fetch range and the live edge.
- virtual FullSequence GetLargestId() const = 0;
+ virtual Location GetLargestId() const = 0;
};
// MoqtTrackPublisher is an application-side API for an MoQT publisher
@@ -136,12 +136,12 @@
// otherwise, the corresponding QUIC streams will be stuck waiting for objects
// that will never arrive.
virtual std::optional<PublishedObject> GetCachedObject(
- FullSequence sequence) const = 0;
+ Location sequence) const = 0;
// Returns a full list of objects available in the cache, to be used for
// SUBSCRIBEs with a backfill. Returned in order of worsening priority.
- virtual std::vector<FullSequence> GetCachedObjectsInRange(
- FullSequence start, FullSequence end) const = 0;
+ virtual std::vector<Location> GetCachedObjectsInRange(Location start,
+ Location end) const = 0;
// TODO: add an API to fetch past objects that are out of cache and might
// require an upstream request to fill the relevant cache again. This is
@@ -160,7 +160,7 @@
// Returns the largest sequence pair that has been published so far.
// This method may only be called if
// DoesTrackStatusImplyHavingData(GetTrackStatus()) is true.
- virtual FullSequence GetLargestSequence() const = 0;
+ virtual Location GetLargestSequence() const = 0;
// Returns the forwarding preference of the track.
// This method may only be called if
@@ -175,8 +175,8 @@
// Performs a fetch for the specified range of objects.
virtual std::unique_ptr<MoqtFetchTask> Fetch(
- FullSequence start, uint64_t end_group,
- std::optional<uint64_t> end_object, MoqtDeliveryOrder order) = 0;
+ Location start, uint64_t end_group, std::optional<uint64_t> end_object,
+ MoqtDeliveryOrder order) = 0;
};
// MoqtPublisher is an interface to a publisher that allows it to publish
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index c1ee3d0..88bedaf 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -69,7 +69,7 @@
}
SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) {
- return SubscribeWindow(subscribe.start.value_or(FullSequence(0, 0)),
+ return SubscribeWindow(subscribe.start.value_or(Location(0, 0)),
subscribe.end_group);
}
@@ -212,7 +212,7 @@
"Received DATAGRAM for non-datagram track");
return;
}
- if (!track->InWindow(FullSequence(message.group_id, message.object_id))) {
+ if (!track->InWindow(Location(message.group_id, message.object_id))) {
// TODO(martinduke): a recent SUBSCRIBE_UPDATE could put us here, and it's
// not an error.
return;
@@ -221,10 +221,10 @@
SubscribeRemoteTrack::Visitor* visitor = track->visitor();
if (visitor != nullptr) {
// TODO(martinduke): Handle extension headers.
- visitor->OnObjectFragment(
- track->full_track_name(),
- FullSequence{message.group_id, 0, message.object_id},
- message.publisher_priority, message.object_status, *payload, true);
+ visitor->OnObjectFragment(track->full_track_name(),
+ Location{message.group_id, 0, message.object_id},
+ message.publisher_priority, message.object_status,
+ *payload, true);
}
}
@@ -328,7 +328,7 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
- message.start = FullSequence(start_group, start_object);
+ message.start = Location(start_group, start_object);
message.end_group = std::nullopt;
message.parameters = std::move(parameters);
return Subscribe(message, visitor);
@@ -347,7 +347,7 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
- message.start = FullSequence(start_group, start_object);
+ message.start = Location(start_group, start_object);
message.end_group = end_group;
message.parameters = std::move(parameters);
return Subscribe(message, visitor);
@@ -379,7 +379,7 @@
}
bool MoqtSession::Fetch(const FullTrackName& name,
- FetchResponseCallback callback, FullSequence start,
+ FetchResponseCallback callback, Location start,
uint64_t end_group, std::optional<uint64_t> end_object,
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
@@ -657,7 +657,7 @@
}
webtransport::Stream* MoqtSession::OpenOrQueueDataStream(
- uint64_t subscription_id, FullSequence first_object) {
+ uint64_t subscription_id, Location first_object) {
auto it = published_subscriptions_.find(subscription_id);
if (it == published_subscriptions_.end()) {
// It is possible that the subscription has been discarded while the stream
@@ -676,7 +676,7 @@
}
webtransport::Stream* MoqtSession::OpenDataStream(
- PublishedSubscription& subscription, FullSequence first_object) {
+ PublishedSubscription& subscription, Location first_object) {
webtransport::Stream* new_stream =
session_->OpenOutgoingUnidirectionalStream();
if (new_stream == nullptr) {
@@ -754,7 +754,7 @@
}
// Pop the item from the subscription's queue, which might update
// subscribes_with_queued_outgoing_data_streams_.
- FullSequence next_queued_stream =
+ Location next_queued_stream =
subscription->second->NextQueuedOutgoingDataStream();
// Check if Group is too old.
if (next_queued_stream.group < subscription->second->first_active_group()) {
@@ -1281,7 +1281,7 @@
return;
}
FullTrackName track_name;
- FullSequence start_object;
+ Location start_object;
uint64_t end_group;
std::optional<uint64_t> end_object;
if (message.joining_fetch.has_value()) {
@@ -1307,11 +1307,11 @@
return;
}
track_name = it->second->publisher().GetTrackName();
- FullSequence fetch_end = it->second->GetWindowStart();
+ Location fetch_end = it->second->GetWindowStart();
if (message.joining_fetch->preceding_group_offset > fetch_end.group) {
- start_object = FullSequence(0, 0);
+ start_object = Location(0, 0);
} else {
- start_object = FullSequence(
+ start_object = Location(
fetch_end.group - message.joining_fetch->preceding_group_offset, 0,
0);
}
@@ -1445,7 +1445,7 @@
status = absl::UnknownError(message.reason_phrase);
break;
}
- fetch->OnFetchResult(FullSequence(0, 0), status, nullptr);
+ fetch->OnFetchResult(Location(0, 0), status, nullptr);
session_->upstream_by_id_.erase(message.subscribe_id);
}
@@ -1522,7 +1522,7 @@
"Received object for a track with a different stream type");
return;
}
- if (!track->InWindow(FullSequence(message.group_id, message.object_id))) {
+ if (!track->InWindow(Location(message.group_id, message.object_id))) {
// This is not an error. It can be the result of a recent SUBSCRIBE_UPDATE.
return;
}
@@ -1533,8 +1533,8 @@
// TODO(martinduke): Send extension headers.
subscribe->visitor()->OnObjectFragment(
track->full_track_name(),
- FullSequence{message.group_id, message.subgroup_id.value_or(0),
- message.object_id},
+ Location{message.group_id, message.subgroup_id.value_or(0),
+ message.object_id},
message.publisher_priority, message.object_status, payload,
end_of_message);
}
@@ -1718,7 +1718,7 @@
}
void MoqtSession::PublishedSubscription::Update(
- FullSequence start, std::optional<uint64_t> end_group,
+ Location start, std::optional<uint64_t> end_group,
MoqtPriority subscriber_priority) {
window_.TruncateStart(start);
if (end_group.has_value()) {
@@ -1749,7 +1749,7 @@
};
void MoqtSession::PublishedSubscription::OnSubscribeAccepted() {
- std::optional<FullSequence> largest_id;
+ std::optional<Location> largest_id;
ControlStream* stream = session_->GetControlStream();
if (PublisherHasData(*track_publisher_)) {
largest_id = track_publisher_->GetLargestSequence();
@@ -1786,12 +1786,12 @@
}
void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
- FullSequence sequence) {
+ Location sequence) {
if (!window_.InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
- FullSequence{sequence.group, sequence.subgroup, 0})) {
+ Location{sequence.group, sequence.subgroup, 0})) {
// This subgroup has already been reset, ignore.
return;
}
@@ -1846,13 +1846,12 @@
"Publisher is gone");
}
-void MoqtSession::PublishedSubscription::OnNewFinAvailable(
- FullSequence sequence) {
+void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) {
if (!window_.InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
- FullSequence{sequence.group, sequence.subgroup, 0})) {
+ Location{sequence.group, sequence.subgroup, 0})) {
// This subgroup has already been reset, ignore.
return;
}
@@ -1873,12 +1872,12 @@
}
void MoqtSession::PublishedSubscription::OnSubgroupAbandoned(
- FullSequence sequence, webtransport::StreamErrorCode error_code) {
+ Location sequence, webtransport::StreamErrorCode error_code) {
if (!window_.InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
- FullSequence{sequence.group, sequence.subgroup, 0})) {
+ Location{sequence.group, sequence.subgroup, 0})) {
// This subgroup has already been reset, ignore.
return;
}
@@ -1908,7 +1907,7 @@
raw_stream->ResetWithUserCode(kResetCodeTimedOut);
}
first_active_group_ = std::max(first_active_group_, group_id + 1);
- absl::erase_if(reset_subgroups_, [&](const FullSequence& sequence) {
+ absl::erase_if(reset_subgroups_, [&](const Location& sequence) {
return sequence.group < first_active_group_;
});
}
@@ -1922,7 +1921,7 @@
}
webtransport::SendOrder MoqtSession::PublishedSubscription::GetSendOrder(
- FullSequence sequence) const {
+ Location sequence) const {
MoqtForwardingPreference forwarding_preference =
track_publisher_->GetForwardingPreference();
@@ -1940,7 +1939,7 @@
// Returns the highest send order in the subscription.
void MoqtSession::PublishedSubscription::AddQueuedOutgoingDataStream(
- FullSequence first_object) {
+ Location first_object) {
std::optional<webtransport::SendOrder> start_send_order =
queued_outgoing_data_streams_.empty()
? std::optional<webtransport::SendOrder>()
@@ -1958,15 +1957,14 @@
}
}
-FullSequence
-MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
+Location MoqtSession::PublishedSubscription::NextQueuedOutgoingDataStream() {
QUICHE_DCHECK(!queued_outgoing_data_streams_.empty());
if (queued_outgoing_data_streams_.empty()) {
- return FullSequence();
+ return Location();
}
auto it = queued_outgoing_data_streams_.rbegin();
webtransport::SendOrder old_send_order = FinalizeSendOrder(it->first);
- FullSequence first_object = it->second;
+ Location first_object = it->second;
// converting a reverse iterator to an iterator involves incrementing it and
// then taking base().
queued_outgoing_data_streams_.erase((++it).base());
@@ -1985,16 +1983,16 @@
}
void MoqtSession::PublishedSubscription::OnDataStreamCreated(
- webtransport::StreamId id, FullSequence start_sequence) {
+ webtransport::StreamId id, Location start_sequence) {
++streams_opened_;
stream_map().AddStream(start_sequence, id);
}
void MoqtSession::PublishedSubscription::OnDataStreamDestroyed(
- webtransport::StreamId id, FullSequence end_sequence) {
+ webtransport::StreamId id, Location end_sequence) {
stream_map().RemoveStream(end_sequence, id);
}
-void MoqtSession::PublishedSubscription::OnObjectSent(FullSequence sequence) {
+void MoqtSession::PublishedSubscription::OnObjectSent(Location sequence) {
if (largest_sent_.has_value()) {
largest_sent_ = std::max(*largest_sent_, sequence);
} else {
@@ -2005,7 +2003,7 @@
MoqtSession::OutgoingDataStream::OutgoingDataStream(
MoqtSession* session, webtransport::Stream* stream,
- PublishedSubscription& subscription, FullSequence first_object)
+ PublishedSubscription& subscription, Location first_object)
: session_(session),
stream_(stream),
subscription_id_(subscription.subscription_id()),
@@ -2127,7 +2125,7 @@
}
}
-void MoqtSession::OutgoingDataStream::Fin(FullSequence last_object) {
+void MoqtSession::OutgoingDataStream::Fin(Location last_object) {
if (next_object_ <= last_object) {
// There is still data to send, do nothing.
return;
@@ -2203,7 +2201,7 @@
// hasn't opened yet.
}
-void MoqtSession::PublishedSubscription::SendDatagram(FullSequence sequence) {
+void MoqtSession::PublishedSubscription::SendDatagram(Location sequence) {
std::optional<PublishedObject> object =
track_publisher_->GetCachedObject(sequence);
if (!object.has_value()) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 465d293..da5742e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -133,7 +133,7 @@
// transfers ownership of MoqtFetchTask to the application.
// To cancel a FETCH, simply destroy the FetchTask.
bool Fetch(const FullTrackName& name, FetchResponseCallback callback,
- FullSequence start, uint64_t end_group,
+ Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
MoqtSubscribeParameters parameters) override;
@@ -341,7 +341,7 @@
uint64_t subscription_id() const { return subscription_id_; }
MoqtTrackPublisher& publisher() { return *track_publisher_; }
uint64_t track_alias() const { return track_alias_; }
- std::optional<FullSequence> largest_sent() const { return largest_sent_; }
+ std::optional<Location> largest_sent() const { return largest_sent_; }
MoqtPriority subscriber_priority() const { return subscriber_priority_; }
std::optional<MoqtDeliveryOrder> subscriber_delivery_order() const {
return subscriber_delivery_order_;
@@ -354,10 +354,10 @@
MoqtSubscribeErrorReason reason,
std::optional<uint64_t> track_alias = std::nullopt) override;
// This is only called for objects that have just arrived.
- void OnNewObjectAvailable(FullSequence sequence) override;
+ void OnNewObjectAvailable(Location sequence) override;
void OnTrackPublisherGone() override;
- void OnNewFinAvailable(FullSequence sequence) override;
- void OnSubgroupAbandoned(FullSequence sequence,
+ void OnNewFinAvailable(Location sequence) override;
+ void OnSubgroupAbandoned(Location sequence,
webtransport::StreamErrorCode error_code) override;
void OnGroupAbandoned(uint64_t group_id) override;
void ProcessObjectAck(const MoqtObjectAck& message) {
@@ -369,37 +369,37 @@
}
// Updates the window and other properties of the subscription in question.
- void Update(FullSequence start, std::optional<uint64_t> end,
+ void Update(Location start, std::optional<uint64_t> end,
MoqtPriority subscriber_priority);
// Checks if the specified sequence is within the window of this
// subscription.
- bool InWindow(FullSequence sequence) { return window_.InWindow(sequence); }
- FullSequence GetWindowStart() const { return window_.start(); }
+ bool InWindow(Location sequence) { return window_.InWindow(sequence); }
+ Location GetWindowStart() const { return window_.start(); }
MoqtFilterType filter_type() const { return filter_type_; };
void OnDataStreamCreated(webtransport::StreamId id,
- FullSequence start_sequence);
+ Location start_sequence);
void OnDataStreamDestroyed(webtransport::StreamId id,
- FullSequence end_sequence);
- void OnObjectSent(FullSequence sequence);
+ Location end_sequence);
+ void OnObjectSent(Location sequence);
std::vector<webtransport::StreamId> GetAllStreams() const;
- webtransport::SendOrder GetSendOrder(FullSequence sequence) const;
+ webtransport::SendOrder GetSendOrder(Location sequence) const;
- void AddQueuedOutgoingDataStream(FullSequence first_object);
+ void AddQueuedOutgoingDataStream(Location first_object);
// Pops the pending outgoing data stream, with the highest send order.
// The session keeps track of which subscribes have pending streams. This
// function will trigger a QUICHE_DCHECK if called when there are no pending
// streams.
- FullSequence NextQueuedOutgoingDataStream();
+ Location NextQueuedOutgoingDataStream();
quic::QuicTimeDelta delivery_timeout() const { return delivery_timeout_; }
void set_delivery_timeout(quic::QuicTimeDelta timeout) {
delivery_timeout_ = timeout;
}
- void OnStreamTimeout(FullSequence sequence) {
+ void OnStreamTimeout(Location sequence) {
sequence.object = 0;
reset_subgroups_.insert(sequence);
if (session_->alternate_delivery_timeout_) {
@@ -409,7 +409,7 @@
uint64_t first_active_group() const { return first_active_group_; }
- absl::flat_hash_set<FullSequence>& reset_subgroups() {
+ absl::flat_hash_set<Location>& reset_subgroups() {
return reset_subgroups_;
}
@@ -421,7 +421,7 @@
return session_->parameters_.perspective;
}
- void SendDatagram(FullSequence sequence);
+ void SendDatagram(Location sequence);
webtransport::SendOrder FinalizeSendOrder(
webtransport::SendOrder send_order) {
return UpdateSendOrderForSubscriberPriority(send_order,
@@ -442,27 +442,27 @@
uint64_t first_active_group_ = 0;
// If a stream has been reset due to delivery timeout, do not open a new
// stream if more object arrive for it.
- absl::flat_hash_set<FullSequence> reset_subgroups_;
+ absl::flat_hash_set<Location> reset_subgroups_;
// The min of DELIVERY_TIMEOUT from SUBSCRIBE and SUBSCRIBE_OK.
quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
std::optional<MoqtDeliveryOrder> subscriber_delivery_order_;
MoqtPublishingMonitorInterface* monitoring_interface_;
// Largest sequence number ever sent via this subscription.
- std::optional<FullSequence> largest_sent_;
+ std::optional<Location> largest_sent_;
// Should be almost always accessed via `stream_map()`.
std::optional<SendStreamMap> lazily_initialized_stream_map_;
// Store the send order of queued outgoing data streams. Use a
// subscriber_priority_ of zero to avoid having to update it, and call
// FinalizeSendOrder() whenever delivering it to the MoqtSession.
- absl::btree_multimap<webtransport::SendOrder, FullSequence>
+ absl::btree_multimap<webtransport::SendOrder, Location>
queued_outgoing_data_streams_;
};
class QUICHE_EXPORT OutgoingDataStream : public webtransport::StreamVisitor {
public:
OutgoingDataStream(MoqtSession* session, webtransport::Stream* stream,
PublishedSubscription& subscription,
- FullSequence first_object);
+ Location first_object);
~OutgoingDataStream();
// webtransport::StreamVisitor implementation.
@@ -491,7 +491,7 @@
// Sends a pure FIN on the stream, if the last object sent matches
// |last_object|. Otherwise, does nothing.
- void Fin(FullSequence last_object);
+ void Fin(Location last_object);
// Recomputes the send order and updates it for the associated stream.
void UpdateSendOrder(PublishedSubscription& subscription);
@@ -511,10 +511,10 @@
MoqtSession* session_;
webtransport::Stream* stream_;
uint64_t subscription_id_;
- // A FullSequence with the minimum object ID that should go out next. The
+ // A Location with the minimum object ID that should go out next. The
// session doesn't know what the next object ID in the stream is because
// the next object could be in a different subgroup or simply be skipped.
- FullSequence next_object_;
+ Location next_object_;
bool stream_header_written_ = false;
// If this data stream is for SUBSCRIBE, reset it if an object has been
// excessively delayed per Section 7.1.1.2.
@@ -614,11 +614,11 @@
// Opens a new data stream, or queues it if the session is flow control
// blocked.
webtransport::Stream* OpenOrQueueDataStream(uint64_t subscription_id,
- FullSequence first_object);
+ Location first_object);
// Same as above, except the session is required to be not flow control
// blocked.
webtransport::Stream* OpenDataStream(PublishedSubscription& subscription,
- FullSequence first_object);
+ Location first_object);
// Returns false if creation failed.
[[nodiscard]] bool OpenDataStream(std::shared_ptr<PublishedFetch> fetch,
webtransport::SendOrder send_order);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
index 08080f1..1fd77eb 100644
--- a/quiche/quic/moqt/moqt_session_interface.h
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -71,7 +71,7 @@
// be used to process the FETCH further. To cancel a FETCH, simply destroy
// the MoqtFetchTask.
virtual bool Fetch(const FullTrackName& name, FetchResponseCallback callback,
- FullSequence start, uint64_t end_group,
+ Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
MoqtSubscribeParameters parameters) = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 294267a..73d5fc8 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -64,7 +64,7 @@
kDefaultTrackName(),
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
- /*start=*/FullSequence(0, 0),
+ /*start=*/Location(0, 0),
/*end_group=*/std::nullopt,
/*parameters=*/MoqtSubscribeParameters(),
};
@@ -78,7 +78,7 @@
/*group_order=*/std::nullopt,
/*joining_fetch=*/std::nullopt,
kDefaultTrackName(),
- /*start=*/FullSequence(0, 0),
+ /*start=*/Location(0, 0),
/*end_group=*/1,
/*end_object=*/std::nullopt,
/*parameters=*/MoqtSubscribeParameters(),
@@ -90,7 +90,7 @@
// this to be removed as well.
static std::shared_ptr<MockTrackPublisher> SetupPublisher(
FullTrackName track_name, MoqtForwardingPreference forwarding_preference,
- FullSequence largest_sequence) {
+ Location largest_sequence) {
auto publisher = std::make_shared<MockTrackPublisher>(std::move(track_name));
ON_CALL(*publisher, GetTrackStatus())
.WillByDefault(Return(MoqtTrackStatusCode::kInProgress));
@@ -131,7 +131,7 @@
return publisher.get();
}
- void SetLargestId(MockTrackPublisher* publisher, FullSequence largest_id) {
+ void SetLargestId(MockTrackPublisher* publisher, Location largest_id) {
ON_CALL(*publisher, GetTrackStatus())
.WillByDefault(Return(MoqtTrackStatusCode::kInProgress));
ON_CALL(*publisher, GetLargestSequence()).WillByDefault(Return(largest_id));
@@ -159,7 +159,7 @@
/*group_order=*/MoqtDeliveryOrder::kAscending,
(*track_status == MoqtTrackStatusCode::kInProgress)
? std::make_optional(publisher->GetLargestSequence())
- : std::optional<FullSequence>(),
+ : std::optional<Location>(),
/*parameters=*/MoqtSubscribeParameters(),
};
EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
@@ -496,7 +496,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(10, 20));
+ SetLargestId(track, Location(10, 20));
MoqtSubscribe request = DefaultSubscribe();
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -505,7 +505,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(10, 20));
+ SetLargestId(track, Location(10, 20));
MoqtSubscribe request = DefaultSubscribe();
request.end_group = 9;
@@ -533,7 +533,7 @@
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
request.subscribe_id = 2;
- request.start = FullSequence(12, 0);
+ request.start = Location(12, 0);
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
"Duplicate subscribe for track"))
@@ -557,7 +557,7 @@
// Subscribe again, succeeds.
request.subscribe_id = 2;
- request.start = FullSequence(12, 0);
+ request.start = Location(12, 0);
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -652,7 +652,7 @@
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
.WillOnce([&](const FullTrackName& ftn,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> error_message) {
EXPECT_EQ(ftn, FullTrackName("foo", "bar"));
EXPECT_FALSE(error_message.has_value());
@@ -733,7 +733,7 @@
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
.WillOnce([&](const FullTrackName& ftn,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> error_message) {
EXPECT_EQ(ftn, FullTrackName("foo", "bar"));
EXPECT_EQ(*error_message, "deadbeef");
@@ -987,7 +987,7 @@
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
+ .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
MoqtPriority publisher_priority, MoqtObjectStatus status,
absl::string_view payload, bool end_of_message) {
EXPECT_EQ(full_track_name, ftn);
@@ -1032,7 +1032,7 @@
&session_, &mock_stream_, MoqtDataStreamType::kStreamHeaderSubgroup);
EXPECT_CALL(visitor, OnObjectFragment(_, _, _, _, _, _))
- .WillOnce([&](const FullTrackName& full_track_name, FullSequence sequence,
+ .WillOnce([&](const FullTrackName& full_track_name, Location sequence,
MoqtPriority publisher_priority, MoqtObjectStatus status,
absl::string_view payload, bool end_of_message) {
EXPECT_EQ(full_track_name, ftn);
@@ -1104,8 +1104,8 @@
TEST_F(MoqtSessionTest, CreateOutgoingDataStreamAndSend) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1138,28 +1138,28 @@
fin |= options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
false};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
EXPECT_TRUE(correct_message);
EXPECT_FALSE(fin);
EXPECT_EQ(MoqtSessionPeer::LargestSentForSubscription(&session_, 0),
- FullSequence(5, 0));
+ Location(5, 0));
}
TEST_F(MoqtSessionTest, FinDataStreamFromCache) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1192,26 +1192,26 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
true};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
EXPECT_TRUE(correct_message);
EXPECT_TRUE(fin);
}
TEST_F(MoqtSessionTest, GroupAbandoned) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1244,18 +1244,18 @@
fin |= options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
true};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
EXPECT_TRUE(correct_message);
EXPECT_TRUE(fin);
@@ -1265,8 +1265,8 @@
TEST_F(MoqtSessionTest, LateFinDataStream) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1299,18 +1299,18 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
false};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
EXPECT_TRUE(correct_message);
EXPECT_FALSE(fin);
fin = false;
@@ -1321,13 +1321,13 @@
fin = options.send_fin();
return absl::OkStatus();
});
- subscription->OnNewFinAvailable(FullSequence(5, 0));
+ subscription->OnNewFinAvailable(Location(5, 0));
}
TEST_F(MoqtSessionTest, SeparateFinForFutureObject) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1360,39 +1360,39 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
false};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
EXPECT_FALSE(fin);
// Try to deliver (5,1), but fail.
EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
EXPECT_CALL(*track, GetCachedObject(_)).Times(0);
EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
- subscription->OnNewObjectAvailable(FullSequence(5, 1));
+ subscription->OnNewObjectAvailable(Location(5, 1));
// Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
EXPECT_CALL(mock_stream_, Writev(_, _)).Times(0);
- subscription->OnNewFinAvailable(FullSequence(5, 1));
+ subscription->OnNewFinAvailable(Location(5, 1));
// Reopen the window.
correct_message = false;
// object id, extensions, payload length, status.
const std::string kExpectedMessage2 = {0x01, 0x00, 0x00, 0x03};
EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([&] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([&] {
return PublishedObject{
- FullSequence(5, 1), MoqtObjectStatus::kEndOfGroup, 127,
+ Location(5, 1), MoqtObjectStatus::kEndOfGroup, 127,
MemSliceFromString(""), MoqtSessionPeer::Now(&session_), true};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 2))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 2))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
EXPECT_CALL(mock_stream_, Writev(_, _))
@@ -1409,8 +1409,8 @@
TEST_F(MoqtSessionTest, PublisherAbandonsSubgroup) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1443,37 +1443,37 @@
fin = options.send_fin();
return absl::OkStatus();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([&] {
- return PublishedObject{FullSequence(5, 0),
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([&] {
+ return PublishedObject{Location(5, 0),
MoqtObjectStatus::kNormal,
127,
MemSliceFromString("deadbeef"),
MoqtSessionPeer::Now(&session_),
false};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
// Abandon the subgroup.
EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
- subscription->OnSubgroupAbandoned(FullSequence(5, 0), 0x1);
+ subscription->OnSubgroupAbandoned(Location(5, 0), 0x1);
}
// TODO: Test operation with multiple streams.
TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
// Queue the outgoing stream.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillOnce(Return(false));
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
// Unblock the session, and cause the queued stream to be sent.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -1495,11 +1495,11 @@
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
- return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128,
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] {
+ return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128,
MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1507,16 +1507,16 @@
TEST_F(MoqtSessionTest, QueuedStreamIsCleared) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
// Queue the outgoing stream.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(false));
- subscription->OnNewObjectAvailable(FullSequence(5, 0, 0));
- subscription->OnNewObjectAvailable(FullSequence(6, 0, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0, 0));
+ subscription->OnNewObjectAvailable(Location(6, 0, 0));
subscription->OnGroupAbandoned(5);
// Unblock the session, and cause the queued stream to be sent. There should
@@ -1541,11 +1541,11 @@
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(6, 0))).WillRepeatedly([] {
- return PublishedObject{FullSequence(6, 0), MoqtObjectStatus::kNormal, 128,
+ EXPECT_CALL(*track, GetCachedObject(Location(6, 0))).WillRepeatedly([] {
+ return PublishedObject{Location(6, 0), MoqtObjectStatus::kNormal, 128,
MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(6, 1))).WillRepeatedly([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(6, 1))).WillRepeatedly([] {
return std::optional<PublishedObject>();
});
session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1553,8 +1553,8 @@
TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtObjectListener* subscription =
MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1578,21 +1578,21 @@
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 0))).WillRepeatedly([] {
- return PublishedObject{FullSequence(5, 0), MoqtObjectStatus::kNormal, 128,
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 0))).WillRepeatedly([] {
+ return PublishedObject{Location(5, 0), MoqtObjectStatus::kNormal, 128,
MemSliceFromString("deadbeef")};
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).WillOnce([] {
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).WillOnce([] {
return std::optional<PublishedObject>();
});
- subscription->OnNewObjectAvailable(FullSequence(5, 0));
+ subscription->OnNewObjectAvailable(Location(5, 0));
// Now that the stream exists and is recorded within subscription, make it
// disappear by returning nullptr.
EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
.WillRepeatedly(Return(nullptr));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(5, 1))).Times(0);
- subscription->OnNewObjectAvailable(FullSequence(5, 1));
+ EXPECT_CALL(*track, GetCachedObject(Location(5, 1))).Times(0);
+ subscription->OnNewObjectAvailable(Location(5, 1));
}
TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -1663,8 +1663,8 @@
TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(4, 2));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(4, 2));
MoqtSessionPeer::AddSubscription(&session_, track, 0, 1, 3, 4);
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
@@ -1677,8 +1677,8 @@
TEST_F(MoqtSessionTest, SendDatagram) {
FullTrackName ftn("foo", "bar");
- std::shared_ptr<MockTrackPublisher> track_publisher = SetupPublisher(
- ftn, MoqtForwardingPreference::kDatagram, FullSequence{4, 0});
+ std::shared_ptr<MockTrackPublisher> track_publisher =
+ SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location{4, 0});
MoqtObjectListener* listener =
MoqtSessionPeer::AddSubscription(&session_, track_publisher, 0, 2, 5, 0);
@@ -1697,12 +1697,12 @@
return webtransport::DatagramStatus(
webtransport::DatagramStatusCode::kSuccess, "");
});
- EXPECT_CALL(*track_publisher, GetCachedObject(FullSequence{5, 0}))
+ EXPECT_CALL(*track_publisher, GetCachedObject(Location{5, 0}))
.WillRepeatedly([] {
- return PublishedObject{FullSequence{5, 0}, MoqtObjectStatus::kNormal,
- 128, MemSliceFromString("deadbeef")};
+ return PublishedObject{Location{5, 0}, MoqtObjectStatus::kNormal, 128,
+ MemSliceFromString("deadbeef")};
});
- listener->OnNewObjectAvailable(FullSequence(5, 0));
+ listener->OnNewObjectAvailable(Location(5, 0));
EXPECT_TRUE(correct_message);
}
@@ -1723,11 +1723,10 @@
};
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
- EXPECT_CALL(
- visitor_,
- OnObjectFragment(ftn, FullSequence{object.group_id, object.object_id},
- object.publisher_priority, object.object_status, payload,
- true))
+ EXPECT_CALL(visitor_,
+ OnObjectFragment(ftn, Location{object.group_id, object.object_id},
+ object.publisher_priority, object.object_status,
+ payload, true))
.Times(1);
session_.OnDatagramReceived(absl::string_view(datagram, sizeof(datagram)));
}
@@ -1767,7 +1766,7 @@
MockSubscribeRemoteTrackVisitor visitor_;
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
- subscribe.start = FullSequence(1, 0);
+ subscribe.start = Location(1, 0);
MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
@@ -1790,7 +1789,7 @@
MockSubscribeRemoteTrackVisitor visitor_;
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
- subscribe.start = FullSequence(1, 0);
+ subscribe.start = Location(1, 0);
MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x80, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
@@ -1800,8 +1799,8 @@
TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(0, 0));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
MoqtObjectListener* subscription =
@@ -1812,9 +1811,9 @@
.WillOnce(Return(false));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription->OnNewObjectAvailable(FullSequence(1, 0));
- subscription->OnNewObjectAvailable(FullSequence(0, 0));
- subscription->OnNewObjectAvailable(FullSequence(2, 0));
+ subscription->OnNewObjectAvailable(Location(1, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0));
+ subscription->OnNewObjectAvailable(Location(2, 0));
// These should be opened in the sequence (0, 0), (1, 0), (2, 0).
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.WillRepeatedly(Return(true));
@@ -1848,23 +1847,23 @@
EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
return stream_visitor[2].get();
});
- EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 0)))
+ EXPECT_CALL(*track, GetCachedObject(Location(0, 0)))
.WillOnce(
- Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
- 127, MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(0, 1)))
+ Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(Location(0, 1)))
.WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 0)))
+ EXPECT_CALL(*track, GetCachedObject(Location(1, 0)))
.WillOnce(
- Return(PublishedObject{FullSequence(1, 0), MoqtObjectStatus::kNormal,
- 127, MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(1, 1)))
+ Return(PublishedObject{Location(1, 0), MoqtObjectStatus::kNormal, 127,
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(Location(1, 1)))
.WillOnce(Return(std::nullopt));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 0)))
+ EXPECT_CALL(*track, GetCachedObject(Location(2, 0)))
.WillOnce(
- Return(PublishedObject{FullSequence(2, 0), MoqtObjectStatus::kNormal,
- 127, MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track, GetCachedObject(FullSequence(2, 1)))
+ Return(PublishedObject{Location(2, 0), MoqtObjectStatus::kNormal, 127,
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track, GetCachedObject(Location(2, 1)))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
@@ -1895,8 +1894,8 @@
TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
FullTrackName ftn("foo", "bar");
- auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
- FullSequence(0, 0));
+ auto track =
+ SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup, Location(0, 0));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
MoqtObjectListener* subscription =
@@ -1905,7 +1904,7 @@
.WillOnce(Return(false));
EXPECT_CALL(*track, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription->OnNewObjectAvailable(FullSequence(0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0));
// Delete the subscription, then grant stream credit.
MoqtSessionPeer::DeleteSubscription(&session_, 0);
@@ -1917,11 +1916,11 @@
TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
FullTrackName ftn1("foo", "bar");
- auto track1 = SetupPublisher(ftn1, MoqtForwardingPreference::kSubgroup,
- FullSequence(0, 0));
+ auto track1 =
+ SetupPublisher(ftn1, MoqtForwardingPreference::kSubgroup, Location(0, 0));
FullTrackName ftn2("dead", "beef");
- auto track2 = SetupPublisher(ftn2, MoqtForwardingPreference::kSubgroup,
- FullSequence(0, 0));
+ auto track2 =
+ SetupPublisher(ftn2, MoqtForwardingPreference::kSubgroup, Location(0, 0));
EXPECT_CALL(*track1, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
EXPECT_CALL(*track2, GetTrackStatus())
@@ -1943,10 +1942,10 @@
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
EXPECT_CALL(*track2, GetTrackStatus())
.WillRepeatedly(Return(MoqtTrackStatusCode::kInProgress));
- subscription0->OnNewObjectAvailable(FullSequence(0, 0));
- subscription1->OnNewObjectAvailable(FullSequence(0, 0));
- subscription0->OnNewObjectAvailable(FullSequence(1, 0));
- subscription1->OnNewObjectAvailable(FullSequence(1, 0));
+ subscription0->OnNewObjectAvailable(Location(0, 0));
+ subscription1->OnNewObjectAvailable(Location(0, 0));
+ subscription0->OnNewObjectAvailable(Location(1, 0));
+ subscription1->OnNewObjectAvailable(Location(1, 0));
// Allow one stream to be opened. It will be group 0, subscription 0.
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -1964,11 +1963,11 @@
EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
return stream_visitor0.get();
});
- EXPECT_CALL(*track1, GetCachedObject(FullSequence(0, 0)))
+ EXPECT_CALL(*track1, GetCachedObject(Location(0, 0)))
.WillOnce(
- Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
- 127, MemSliceFromString("foobar")}));
- EXPECT_CALL(*track1, GetCachedObject(FullSequence(0, 1)))
+ Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
+ MemSliceFromString("foobar")}));
+ EXPECT_CALL(*track1, GetCachedObject(Location(0, 1)))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream0, Writev(_, _))
@@ -2000,11 +1999,11 @@
EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
return stream_visitor1.get();
});
- EXPECT_CALL(*track2, GetCachedObject(FullSequence(0, 0)))
+ EXPECT_CALL(*track2, GetCachedObject(Location(0, 0)))
.WillOnce(
- Return(PublishedObject{FullSequence(0, 0), MoqtObjectStatus::kNormal,
- 127, MemSliceFromString("deadbeef")}));
- EXPECT_CALL(*track2, GetCachedObject(FullSequence(0, 1)))
+ Return(PublishedObject{Location(0, 0), MoqtObjectStatus::kNormal, 127,
+ MemSliceFromString("deadbeef")}));
+ EXPECT_CALL(*track2, GetCachedObject(Location(0, 1)))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(mock_stream1, Writev(_, _))
@@ -2024,7 +2023,7 @@
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MoqtFetch fetch = DefaultFetch();
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(0, 0));
+ SetLargestId(track, Location(0, 0));
auto fetch_task_ptr = std::make_unique<MockFetchTask>();
EXPECT_CALL(*track, Fetch).WillOnce(Return(std::move(fetch_task_ptr)));
@@ -2043,7 +2042,7 @@
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
MoqtFetch fetch = DefaultFetch();
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(0, 0));
+ SetLargestId(track, Location(0, 0));
auto fetch_task_ptr = std::make_unique<MockFetchTask>();
MockFetchTask* fetch_task = fetch_task_ptr.get();
@@ -2077,7 +2076,7 @@
EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(*fetch_task, GetNextObject(_))
.WillOnce(Invoke([](PublishedObject& output) {
- output.sequence = FullSequence(0, 0, 0);
+ output.sequence = Location(0, 0, 0);
output.status = MoqtObjectStatus::kNormal;
output.publisher_priority = 128;
output.payload = MemSliceFromString("foo");
@@ -2155,7 +2154,7 @@
EXPECT_CALL(data_stream, CanWrite()).WillRepeatedly(Return(true));
EXPECT_CALL(*fetch, GetNextObject(_))
.WillOnce(Invoke([](PublishedObject& output) {
- output.sequence = FullSequence(0, 0, 0);
+ output.sequence = Location(0, 0, 0);
output.status = MoqtObjectStatus::kNormal;
output.publisher_priority = 128;
output.payload = MemSliceFromString("foo");
@@ -2214,7 +2213,7 @@
EXPECT_CALL(*fetch, GetNextObject(_))
.WillOnce(Invoke([](PublishedObject& output) {
// DoesNotExist will be skipped.
- output.sequence = FullSequence(0, 0, 0);
+ output.sequence = Location(0, 0, 0);
output.status = MoqtObjectStatus::kObjectDoesNotExist;
output.publisher_priority = 128;
output.payload = MemSliceFromString("");
@@ -2222,7 +2221,7 @@
return MoqtFetchTask::GetNextObjectResult::kSuccess;
}))
.WillOnce(Invoke([](PublishedObject& output) {
- output.sequence = FullSequence(0, 0, 1);
+ output.sequence = Location(0, 0, 1);
output.status = MoqtObjectStatus::kEndOfGroup;
output.publisher_priority = 128;
output.payload = MemSliceFromString("");
@@ -2265,22 +2264,21 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(4, 0, 10));
+ SetLargestId(track, Location(4, 0, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtObjectListener* subscription =
MoqtSessionPeer::GetSubscription(&session_, subscribe.subscribe_id);
ASSERT_NE(subscription, nullptr);
- EXPECT_TRUE(MoqtSessionPeer::InSubscriptionWindow(subscription,
- FullSequence(4, 0, 11)));
- EXPECT_FALSE(MoqtSessionPeer::InSubscriptionWindow(subscription,
- FullSequence(4, 0, 10)));
+ EXPECT_TRUE(
+ MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11)));
+ EXPECT_FALSE(
+ MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 10)));
// Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
MoqtFetch fetch = DefaultFetch();
fetch.joining_fetch = {1, 2};
- EXPECT_CALL(*track,
- Fetch(FullSequence(2, 0), 4, std::optional<uint64_t>(10), _))
+ EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
.WillOnce(Return(std::make_unique<MockFetchTask>()));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kFetchOk), _));
@@ -2307,7 +2305,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, FullSequence(2, 0, 10));
+ SetLargestId(track, Location(2, 0, 10));
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtFetch fetch = DefaultFetch();
@@ -2366,10 +2364,10 @@
EXPECT_CALL(remote_track_visitor, OnReply).Times(1);
stream_input->OnSubscribeOkMessage(
MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0),
- MoqtDeliveryOrder::kAscending, FullSequence(2, 0),
+ MoqtDeliveryOrder::kAscending, Location(2, 0),
MoqtSubscribeParameters()));
stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending,
- FullSequence(2, 0),
+ Location(2, 0),
MoqtSubscribeParameters()));
// Packet arrives on FETCH stream.
MoqtObject object = {
@@ -2449,17 +2447,17 @@
[&](std::unique_ptr<MoqtFetchTask> task) {
fetch_task = std::move(task);
},
- FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ Location(0, 0), 4, std::nullopt, 128, std::nullopt,
MoqtSubscribeParameters());
MoqtFetchOk ok = {
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/FullSequence(3, 25),
+ /*largest_id=*/Location(3, 25),
MoqtSubscribeParameters(),
};
stream_input->OnFetchOkMessage(ok);
ASSERT_NE(fetch_task, nullptr);
- EXPECT_EQ(fetch_task->GetLargestId(), FullSequence(3, 25));
+ EXPECT_EQ(fetch_task->GetLargestId(), Location(3, 25));
EXPECT_TRUE(fetch_task->GetStatus().ok());
PublishedObject object;
EXPECT_EQ(fetch_task->GetNextObject(object),
@@ -2479,7 +2477,7 @@
[&](std::unique_ptr<MoqtFetchTask> task) {
fetch_task = std::move(task);
},
- FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ Location(0, 0), 4, std::nullopt, 128, std::nullopt,
MoqtSubscribeParameters());
MoqtFetchError error = {
/*subscribe_id=*/0,
@@ -2514,7 +2512,7 @@
} while (result != MoqtFetchTask::GetNextObjectResult::kPending);
});
},
- FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ Location(0, 0), 4, std::nullopt, 128, std::nullopt,
MoqtSubscribeParameters());
// Build queue of packets to arrive.
std::queue<quiche::QuicheBuffer> headers;
@@ -2554,7 +2552,7 @@
MoqtFetchOk ok = {
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/FullSequence(3, 25),
+ /*largest_id=*/Location(3, 25),
MoqtSubscribeParameters(),
};
stream_input->OnFetchOkMessage(ok);
@@ -2584,7 +2582,7 @@
fetch_task->SetObjectAvailableCallback(
[&]() { objects_available = true; });
},
- FullSequence(0, 0), 4, std::nullopt, 128, std::nullopt,
+ Location(0, 0), 4, std::nullopt, 128, std::nullopt,
MoqtSubscribeParameters());
// Build queue of packets to arrive.
std::queue<quiche::QuicheBuffer> headers;
@@ -2624,7 +2622,7 @@
MoqtFetchOk ok = {
/*subscribe_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*largest_id=*/FullSequence(3, 25),
+ /*largest_id=*/Location(3, 25),
MoqtSubscribeParameters(),
};
stream_input->OnFetchOkMessage(ok);
@@ -2753,7 +2751,7 @@
}));
EXPECT_CALL(*track_publisher, GetCachedObject(_))
.WillOnce(Return(PublishedObject{
- FullSequence(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
+ Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
quiche::QuicheMemSlice(),
MoqtSessionPeer::Now(&session_) - quic::QuicTimeDelta::FromSeconds(1),
false}));
@@ -2762,19 +2760,19 @@
stream_visitor.reset();
}));
// Arrival time is very old; reset immediately.
- subscription->OnNewObjectAvailable(FullSequence(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0, 0));
// Subsequent objects for that subgroup are ignored.
EXPECT_CALL(*track_publisher, GetCachedObject(_)).Times(0);
EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0);
EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
.Times(0);
- subscription->OnNewObjectAvailable(FullSequence(0, 0, 1));
+ subscription->OnNewObjectAvailable(Location(0, 0, 1));
// Check that reset_subgroups_ is pruned.
- EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
- FullSequence(0, 0, 1)));
+ EXPECT_TRUE(
+ MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1)));
subscription->OnGroupAbandoned(0);
- EXPECT_FALSE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
- FullSequence(0, 0, 1)));
+ EXPECT_FALSE(
+ MoqtSessionPeer::SubgroupHasBeenReset(subscription, Location(0, 0, 1)));
}
TEST_F(MoqtSessionTest, DeliveryTimeoutAfterIntegratedFin) {
@@ -2809,12 +2807,12 @@
}));
EXPECT_CALL(*track_publisher, GetCachedObject(_))
.WillOnce(Return(PublishedObject{
- FullSequence(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
+ Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), true}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut)).Times(0);
- subscription->OnNewObjectAvailable(FullSequence(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0, 0));
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
MoqtSessionPeer::GetAlarm(stream_visitor.get()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut))
@@ -2858,14 +2856,14 @@
}));
EXPECT_CALL(*track_publisher, GetCachedObject(_))
.WillOnce(Return(PublishedObject{
- FullSequence(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
+ Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(FullSequence(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0, 0));
EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewFinAvailable(FullSequence(0, 0, 0));
+ subscription->OnNewFinAvailable(Location(0, 0, 0));
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
MoqtSessionPeer::GetAlarm(stream_visitor.get()));
EXPECT_CALL(data_mock, ResetWithUserCode(kResetCodeTimedOut))
@@ -2910,11 +2908,11 @@
}));
EXPECT_CALL(*track_publisher, GetCachedObject(_))
.WillOnce(Return(PublishedObject{
- FullSequence(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
+ Location(0, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock1, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(FullSequence(0, 0, 0));
+ subscription->OnNewObjectAvailable(Location(0, 0, 0));
webtransport::test::MockStream data_mock2;
EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
@@ -2935,11 +2933,11 @@
}));
EXPECT_CALL(*track_publisher, GetCachedObject(_))
.WillOnce(Return(PublishedObject{
- FullSequence(1, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
+ Location(1, 0), MoqtObjectStatus::kObjectDoesNotExist, 0,
quiche::QuicheMemSlice(), MoqtSessionPeer::Now(&session_), false}))
.WillOnce(Return(std::nullopt));
EXPECT_CALL(data_mock2, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
- subscription->OnNewObjectAvailable(FullSequence(1, 0, 0));
+ subscription->OnNewObjectAvailable(Location(1, 0, 0));
// Group 1 should start the timer on the Group 0 stream.
auto* delivery_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
@@ -2972,8 +2970,8 @@
std::optional<MoqtAnnounceErrorReason> /*error*/) {});
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
- +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, FullSequence(0, 0),
- 5, std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
+ +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5,
+ std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
// Error on additional GOAWAY.
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -3029,8 +3027,8 @@
std::optional<MoqtAnnounceErrorReason> /*error*/) {});
EXPECT_FALSE(session_.Fetch(
FullTrackName{"foo", "bar"},
- +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, FullSequence(0, 0),
- 5, std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
+ +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5,
+ std::nullopt, 127, std::nullopt, MoqtSubscribeParameters()));
session_.GoAway("");
// GoAway timer fires.
auto* goaway_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>(
@@ -3261,7 +3259,7 @@
MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
// Get the window, set the maximum delivered.
LocalTrack* track = MoqtSessionPeer::local_track(&session_, ftn);
- track->GetWindow(0)->OnObjectSent(FullSequence(7, 3),
+ track->GetWindow(0)->OnObjectSent(Location(7, 3),
MoqtObjectStatus::kNormal);
// Update the end to fall at the last delivered object.
MoqtSubscribeUpdate update = {
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 36d8518..97608c5 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -16,21 +16,21 @@
namespace moqt {
ReducedSequenceIndex::ReducedSequenceIndex(
- FullSequence sequence, MoqtForwardingPreference preference) {
+ Location sequence, MoqtForwardingPreference preference) {
switch (preference) {
case MoqtForwardingPreference::kSubgroup:
- sequence_ = FullSequence(sequence.group, 0, sequence.subgroup);
+ sequence_ = Location(sequence.group, 0, sequence.subgroup);
break;
case MoqtForwardingPreference::kDatagram:
- sequence_ = FullSequence(sequence.group, 0, sequence.object);
+ sequence_ = Location(sequence.group, 0, sequence.object);
return;
}
}
std::optional<webtransport::StreamId> SendStreamMap::GetStreamForSequence(
- FullSequence sequence) const {
+ Location sequence) const {
QUICHE_DCHECK(forwarding_preference_ == MoqtForwardingPreference::kSubgroup);
- FullSequence index =
+ Location index =
ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
auto group_it = send_streams_.find(index.group);
if (group_it == send_streams_.end()) {
@@ -43,18 +43,18 @@
return subgroup_it->second;
}
-void SendStreamMap::AddStream(FullSequence sequence,
+void SendStreamMap::AddStream(Location sequence,
webtransport::StreamId stream_id) {
- FullSequence index =
+ Location index =
ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
auto [it, result] = send_streams_.insert({index.group, Group()});
auto [sg, success] = it->second.try_emplace(index.subgroup, stream_id);
QUIC_BUG_IF(quic_bug_moqt_draft_03_02, !success) << "Stream already added";
}
-void SendStreamMap::RemoveStream(FullSequence sequence,
+void SendStreamMap::RemoveStream(Location sequence,
webtransport::StreamId stream_id) {
- FullSequence index =
+ Location index =
ReducedSequenceIndex(sequence, forwarding_preference_).sequence();
auto group_it = send_streams_.find(index.group);
if (group_it == send_streams_.end()) {
@@ -70,7 +70,7 @@
group_it->second.erase(subgroup_it);
}
-bool SubscribeWindow::TruncateStart(FullSequence start) {
+bool SubscribeWindow::TruncateStart(Location start) {
if (start < start_) {
return false;
}
@@ -82,11 +82,11 @@
if (end_group > end_.group) {
return false;
}
- end_ = FullSequence(end_group, UINT64_MAX);
+ end_ = Location(end_group, UINT64_MAX);
return true;
}
-bool SubscribeWindow::TruncateEnd(FullSequence largest_id) {
+bool SubscribeWindow::TruncateEnd(Location largest_id) {
if (largest_id > end_) {
return false;
}
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 5ad8910..f1eea5d 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -22,45 +22,44 @@
public:
// Creates a half-open window for SUBSCRIBES.
SubscribeWindow() = default;
- SubscribeWindow(FullSequence start) : start_(start) {}
+ SubscribeWindow(Location start) : start_(start) {}
// Creates a closed window for SUBSCRIBE or FETCH with no end object;
- SubscribeWindow(FullSequence start, std::optional<uint64_t> end_group)
+ SubscribeWindow(Location start, std::optional<uint64_t> end_group)
: start_(start),
- end_(FullSequence(end_group.value_or(UINT64_MAX), UINT64_MAX)) {}
+ end_(Location(end_group.value_or(UINT64_MAX), UINT64_MAX)) {}
// For FETCH with end object
- SubscribeWindow(FullSequence start, uint64_t end_group,
+ SubscribeWindow(Location start, uint64_t end_group,
std::optional<uint64_t> end_object)
: start_(start),
- end_(FullSequence(end_group, end_object.value_or(UINT64_MAX))) {}
+ end_(Location(end_group, end_object.value_or(UINT64_MAX))) {}
- bool InWindow(const FullSequence& seq) const {
+ bool InWindow(const Location& seq) const {
return start_ <= seq && seq <= end_;
}
- FullSequence start() const { return start_; }
- FullSequence end() const { return end_; }
+ Location start() const { return start_; }
+ Location end() const { return end_; }
// Updates the subscription window. Returns true if the update is valid (in
// MoQT, subscription windows are only allowed to shrink, not to expand).
// Called only as a result of SUBSCRIBE_OK (largest_id) or SUBSCRIBE_UPDATE.
- bool TruncateStart(FullSequence start);
+ bool TruncateStart(Location start);
// Called only as a result of SUBSCRIBE_UPDATE.
bool TruncateEnd(uint64_t end_group);
// Called only as a result of FETCH_OK (largest_id)
- bool TruncateEnd(FullSequence largest_id);
+ bool TruncateEnd(Location largest_id);
private:
// The subgroups in these sequences have no meaning.
- FullSequence start_ = FullSequence();
- FullSequence end_ = FullSequence(UINT64_MAX, UINT64_MAX);
+ Location start_ = Location();
+ Location end_ = Location(UINT64_MAX, UINT64_MAX);
};
// ReducedSequenceIndex represents an index object such that if two sequence
// numbers are mapped to the same stream, they will be mapped to the same index.
class ReducedSequenceIndex {
public:
- ReducedSequenceIndex(FullSequence sequence,
- MoqtForwardingPreference preference);
+ ReducedSequenceIndex(Location sequence, MoqtForwardingPreference preference);
bool operator==(const ReducedSequenceIndex& other) const {
return sequence_ == other.sequence_;
@@ -68,7 +67,7 @@
bool operator!=(const ReducedSequenceIndex& other) const {
return sequence_ != other.sequence_;
}
- FullSequence sequence() { return sequence_; }
+ Location sequence() { return sequence_; }
template <typename H>
friend H AbslHashValue(H h, const ReducedSequenceIndex& m) {
@@ -76,7 +75,7 @@
}
private:
- FullSequence sequence_;
+ Location sequence_;
};
// A map of outgoing data streams indexed by object sequence numbers.
@@ -86,9 +85,9 @@
: forwarding_preference_(forwarding_preference) {}
std::optional<webtransport::StreamId> GetStreamForSequence(
- FullSequence sequence) const;
- void AddStream(FullSequence sequence, webtransport::StreamId stream_id);
- void RemoveStream(FullSequence sequence, webtransport::StreamId stream_id);
+ Location sequence) const;
+ void AddStream(Location sequence, webtransport::StreamId stream_id);
+ void RemoveStream(Location sequence, webtransport::StreamId stream_id);
std::vector<webtransport::StreamId> GetAllStreams() const;
std::vector<webtransport::StreamId> GetStreamsForGroup(
uint64_t group_id) const;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index d9b4160..2f2ed7a 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -21,29 +21,29 @@
SubscribeWindowTest() {}
const uint64_t subscribe_id_ = 2;
- const FullSequence start_{4, 0};
+ const Location start_{4, 0};
const uint64_t end_ = 5;
};
TEST_F(SubscribeWindowTest, Queries) {
SubscribeWindow window(start_, end_);
- EXPECT_TRUE(window.InWindow(FullSequence(4, 0)));
- EXPECT_TRUE(window.InWindow(FullSequence(5, UINT64_MAX)));
- EXPECT_FALSE(window.InWindow(FullSequence(6, 0)));
- EXPECT_FALSE(window.InWindow(FullSequence(3, 12)));
+ EXPECT_TRUE(window.InWindow(Location(4, 0)));
+ EXPECT_TRUE(window.InWindow(Location(5, UINT64_MAX)));
+ EXPECT_FALSE(window.InWindow(Location(6, 0)));
+ EXPECT_FALSE(window.InWindow(Location(3, 12)));
}
TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) {
SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup);
- stream_map.AddStream(FullSequence{4, 0}, 2);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
- stream_map.AddStream(FullSequence{5, 2}, 6);
- EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{5, 3}, 6),
+ stream_map.AddStream(Location{4, 0}, 2);
+ EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), std::nullopt);
+ stream_map.AddStream(Location{5, 2}, 6);
+ EXPECT_QUIC_BUG(stream_map.AddStream(Location{5, 3}, 6),
"Stream already added");
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 1)), 2);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), 6);
- stream_map.RemoveStream(FullSequence{5, 1}, 6);
- EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt);
+ EXPECT_EQ(stream_map.GetStreamForSequence(Location(4, 1)), 2);
+ EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 0)), 6);
+ stream_map.RemoveStream(Location{5, 1}, 6);
+ EXPECT_EQ(stream_map.GetStreamForSequence(Location(5, 2)), std::nullopt);
}
TEST_F(SubscribeWindowTest, UpdateStartEnd) {
@@ -51,14 +51,14 @@
EXPECT_TRUE(window.TruncateStart(start_.next()));
EXPECT_TRUE(window.TruncateEnd(end_ - 1));
EXPECT_FALSE(window.InWindow(start_));
- EXPECT_FALSE(window.InWindow(FullSequence(end_, 0)));
+ EXPECT_FALSE(window.InWindow(Location(end_, 0)));
// Widens start_ again.
EXPECT_FALSE(window.TruncateStart(start_));
// Widens end_ again.
EXPECT_FALSE(window.TruncateEnd(end_));
- EXPECT_TRUE(window.TruncateEnd(FullSequence(end_ - 1, 10)));
- EXPECT_TRUE(window.InWindow(FullSequence(end_ - 1, 10)));
- EXPECT_FALSE(window.InWindow(FullSequence(end_ - 1, 11)));
+ EXPECT_TRUE(window.TruncateEnd(Location(end_ - 1, 10)));
+ EXPECT_TRUE(window.InWindow(Location(end_ - 1, 10)));
+ EXPECT_FALSE(window.InWindow(Location(end_ - 1, 11)));
}
} // namespace test
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 516ac4a..9809805 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -120,7 +120,7 @@
}
}
-void UpstreamFetch::OnFetchResult(FullSequence largest_id, absl::Status status,
+void UpstreamFetch::OnFetchResult(Location largest_id, absl::Status status,
TaskDestroyedCallback callback) {
auto task = std::make_unique<UpstreamFetchTask>(largest_id, status,
std::move(callback));
@@ -163,9 +163,9 @@
quiche::QuicheMemSlice message_slice(std::move(payload_));
output.payload = std::move(message_slice);
}
- output.sequence = FullSequence(next_object_->group_id,
- next_object_->subgroup_id.value_or(0),
- next_object_->object_id);
+ output.sequence =
+ Location(next_object_->group_id, next_object_->subgroup_id.value_or(0),
+ next_object_->object_id);
output.status = next_object_->object_status;
output.publisher_priority = next_object_->publisher_priority;
output.fin_after_this = false;
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 9905b18..2581248 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -65,9 +65,7 @@
uint64_t subscribe_id() const { return subscribe_id_; }
// Is the object one that was requested?
- bool InWindow(FullSequence sequence) const {
- return window_.InWindow(sequence);
- }
+ bool InWindow(Location sequence) const { return window_.InWindow(sequence); }
quiche::QuicheWeakPtr<RemoteTrack> weak_ptr() {
return weak_ptr_factory_.Create();
@@ -102,24 +100,26 @@
// automatically retry.
virtual void OnReply(
const FullTrackName& full_track_name,
- std::optional<FullSequence> largest_id,
+ std::optional<Location> largest_id,
std::optional<absl::string_view> error_reason_phrase) = 0;
// Called when the subscription process is far enough that it is possible to
// send OBJECT_ACK messages; provides a callback to do so. The callback is
// valid for as long as the session is valid.
virtual void OnCanAckObjects(MoqtObjectAckFunction ack_function) = 0;
// Called when an object fragment (or an entire object) is received.
- virtual void OnObjectFragment(
- const FullTrackName& full_track_name, FullSequence sequence,
- MoqtPriority publisher_priority, MoqtObjectStatus object_status,
- absl::string_view object, bool end_of_message) = 0;
+ virtual void OnObjectFragment(const FullTrackName& full_track_name,
+ Location sequence,
+ MoqtPriority publisher_priority,
+ MoqtObjectStatus object_status,
+ absl::string_view object,
+ bool end_of_message) = 0;
virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
};
SubscribeRemoteTrack(
const MoqtSubscribe& subscribe, Visitor* visitor,
quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite())
: RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
- SubscribeWindow(subscribe.start.value_or(FullSequence()),
+ SubscribeWindow(subscribe.start.value_or(Location()),
subscribe.end_group)),
track_alias_(subscribe.track_alias),
visitor_(visitor),
@@ -153,9 +153,7 @@
}
}
// Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE.
- bool TruncateStart(FullSequence start) {
- return window().TruncateStart(start);
- }
+ bool TruncateStart(Location start) { return window().TruncateStart(start); }
// Called on SUBSCRIBE_UPDATE.
bool TruncateEnd(uint64_t end_group) {
return window().TruncateEnd(end_group);
@@ -224,7 +222,7 @@
UpstreamFetch(const MoqtFetch& fetch, FetchResponseCallback callback)
: RemoteTrack(fetch.full_track_name, fetch.fetch_id,
fetch.joining_fetch.has_value()
- ? SubscribeWindow(FullSequence(0, 0))
+ ? SubscribeWindow(Location(0, 0))
: SubscribeWindow(fetch.start_object, fetch.end_group,
fetch.end_object)),
ok_callback_(std::move(callback)) {
@@ -239,7 +237,7 @@
// If the UpstreamFetch is destroyed, it will call OnStreamAndFetchClosed
// which sets the TaskDestroyedCallback to nullptr. Thus, |callback| can
// assume that UpstreamFetch is valid.
- UpstreamFetchTask(FullSequence largest_id, absl::Status status,
+ UpstreamFetchTask(Location largest_id, absl::Status status,
TaskDestroyedCallback callback)
: largest_id_(largest_id),
status_(status),
@@ -254,7 +252,7 @@
object_available_callback_ = std::move(callback);
};
absl::Status GetStatus() override { return status_; };
- FullSequence GetLargestId() const override { return largest_id_; }
+ Location GetLargestId() const override { return largest_id_; }
quiche::QuicheWeakPtr<UpstreamFetchTask> weak_ptr() {
return weak_ptr_factory_.Create();
@@ -289,7 +287,7 @@
absl::string_view reason_phrase);
private:
- FullSequence largest_id_;
+ Location largest_id_;
absl::Status status_;
TaskDestroyedCallback task_destroyed_callback_;
@@ -316,7 +314,7 @@
};
// Arrival of FETCH_OK/FETCH_ERROR.
- void OnFetchResult(FullSequence largest_id, absl::Status status,
+ void OnFetchResult(Location largest_id, absl::Status status,
TaskDestroyedCallback callback);
UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 1e5c053..910c433 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -54,7 +54,7 @@
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/128,
/*group_order=*/std::nullopt,
- /*start=*/FullSequence(2, 0),
+ /*start=*/Location(2, 0),
std::nullopt,
MoqtSubscribeParameters(),
};
@@ -84,11 +84,11 @@
}
TEST_F(SubscribeRemoteTrackTest, Windows) {
- EXPECT_TRUE(track_.InWindow(FullSequence(2, 0)));
- track_.TruncateStart(FullSequence(2, 1));
- EXPECT_FALSE(track_.InWindow(FullSequence(2, 0)));
+ EXPECT_TRUE(track_.InWindow(Location(2, 0)));
+ track_.TruncateStart(Location(2, 1));
+ EXPECT_FALSE(track_.InWindow(Location(2, 0)));
track_.TruncateEnd(2);
- EXPECT_FALSE(track_.InWindow(FullSequence(3, 0)));
+ EXPECT_FALSE(track_.InWindow(Location(3, 0)));
}
class UpstreamFetchTest : public quic::test::QuicTest {
@@ -104,7 +104,7 @@
/*group_order=*/std::nullopt,
/*joining_fetch=*/std::nullopt,
/*full_track_name=*/FullTrackName("foo", "bar"),
- /*start_object=*/FullSequence(1, 1),
+ /*start_object=*/Location(1, 1),
/*end_group=*/3,
/*end_object=*/100,
/*parameters=*/MoqtSubscribeParameters(),
@@ -122,10 +122,10 @@
EXPECT_TRUE(
fetch_.CheckDataStreamType(MoqtDataStreamType::kStreamHeaderFetch));
EXPECT_TRUE(fetch_.is_fetch());
- EXPECT_FALSE(fetch_.InWindow(FullSequence{1, 0}));
- EXPECT_TRUE(fetch_.InWindow(FullSequence{1, 1}));
- EXPECT_TRUE(fetch_.InWindow(FullSequence{3, 100}));
- EXPECT_FALSE(fetch_.InWindow(FullSequence{3, 101}));
+ EXPECT_FALSE(fetch_.InWindow(Location{1, 0}));
+ EXPECT_TRUE(fetch_.InWindow(Location{1, 1}));
+ EXPECT_TRUE(fetch_.InWindow(Location{3, 100}));
+ EXPECT_FALSE(fetch_.InWindow(Location{3, 101}));
}
TEST_F(UpstreamFetchTest, AllowError) {
@@ -136,16 +136,16 @@
TEST_F(UpstreamFetchTest, FetchResponse) {
EXPECT_EQ(fetch_task_, nullptr);
- fetch_.OnFetchResult(FullSequence(3, 50), absl::OkStatus(), nullptr);
+ fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
EXPECT_NE(fetch_task_, nullptr);
EXPECT_NE(fetch_.task(), nullptr);
EXPECT_TRUE(fetch_task_->GetStatus().ok());
- EXPECT_EQ(fetch_task_->GetLargestId(), FullSequence(3, 50));
+ EXPECT_EQ(fetch_task_->GetLargestId(), Location(3, 50));
}
TEST_F(UpstreamFetchTest, FetchClosedByMoqt) {
bool terminated = false;
- fetch_.OnFetchResult(FullSequence(3, 50), absl::OkStatus(),
+ fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(),
[&]() { terminated = true; });
bool got_eof = false;
fetch_task_->SetObjectAvailableCallback([&]() {
@@ -161,14 +161,14 @@
TEST_F(UpstreamFetchTest, FetchClosedByApplication) {
bool terminated = false;
- fetch_.OnFetchResult(FullSequence(3, 50), absl::Status(),
+ fetch_.OnFetchResult(Location(3, 50), absl::Status(),
[&]() { terminated = true; });
fetch_task_.reset();
EXPECT_TRUE(terminated);
}
TEST_F(UpstreamFetchTest, ObjectRetrieval) {
- fetch_.OnFetchResult(FullSequence(3, 50), absl::OkStatus(), nullptr);
+ fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr);
PublishedObject object;
EXPECT_EQ(fetch_task_->GetNextObject(object),
MoqtFetchTask::GetNextObjectResult::kPending);
@@ -178,7 +178,7 @@
got_object = true;
EXPECT_EQ(fetch_task_->GetNextObject(object),
MoqtFetchTask::GetNextObjectResult::kSuccess);
- EXPECT_EQ(object.sequence, FullSequence(3, 0, 0));
+ EXPECT_EQ(object.sequence, Location(3, 0, 0));
EXPECT_EQ(object.payload.AsStringView(), "foobar");
});
int got_read_callback = 0;
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
index a9db174..565b1db 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
@@ -62,7 +62,7 @@
visitor_->OnReply(name_, std::nullopt, reason.reason_phrase);
}
- void OnNewObjectAvailable(FullSequence sequence) {
+ void OnNewObjectAvailable(Location sequence) {
std::optional<PublishedObject> object =
publisher_->GetCachedObject(sequence);
if (!object.has_value()) {
@@ -79,8 +79,8 @@
/*end_of_message=*/true);
}
- void OnNewFinAvailable(FullSequence sequence) override {}
- void OnSubgroupAbandoned(FullSequence sequence,
+ void OnNewFinAvailable(Location sequence) override {}
+ void OnSubgroupAbandoned(Location sequence,
webtransport::StreamErrorCode error_code) override {}
void OnGroupAbandoned(uint64_t group_id) override {}
void OnTrackPublisherGone() override { visitor_->OnSubscribeDone(name_); }
@@ -137,7 +137,7 @@
MoqtSubscribeParameters) {
return Subscribe(
name, visitor,
- SubscribeWindow(FullSequence(start_group, start_object)));
+ SubscribeWindow(Location(start_group, start_object)));
});
ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _, _))
.WillByDefault([this](const FullTrackName& name, uint64_t start_group,
@@ -146,8 +146,7 @@
MoqtSubscribeParameters) {
return Subscribe(
name, visitor,
- SubscribeWindow(FullSequence(start_group, start_object),
- end_group));
+ SubscribeWindow(Location(start_group, start_object), end_group));
});
ON_CALL(*this, Unsubscribe)
.WillByDefault([this](const FullTrackName& name) {
@@ -156,7 +155,7 @@
ON_CALL(*this, Fetch)
.WillByDefault(
[this](const FullTrackName& name, FetchResponseCallback callback,
- FullSequence start, uint64_t end_group,
+ Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
MoqtSubscribeParameters parameters) {
@@ -207,14 +206,14 @@
if (track_publisher->get()->GetTrackStatus().value_or(
MoqtTrackStatusCode::kStatusNotAvailable) ==
MoqtTrackStatusCode::kNotYetBegun) {
- return Fetch(name, std::move(callback), FullSequence(0, 0), 0, 0,
+ return Fetch(name, std::move(callback), Location(0, 0), 0, 0,
priority, delivery_order, std::move(parameters));
}
- FullSequence largest = track_publisher->get()->GetLargestSequence();
+ Location largest = track_publisher->get()->GetLargestSequence();
uint64_t start_group = largest.group >= num_previous_groups
? largest.group - num_previous_groups + 1
: 0;
- return Fetch(name, std::move(callback), FullSequence(start_group, 0),
+ return Fetch(name, std::move(callback), Location(start_group, 0),
largest.group, largest.object, priority, delivery_order,
std::move(parameters));
});
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h
index 74b8195..0deb44d 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.h
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -58,7 +58,7 @@
MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override));
MOCK_METHOD(bool, Fetch,
(const FullTrackName& name, FetchResponseCallback callback,
- FullSequence start, uint64_t end_group,
+ Location start, uint64_t end_group,
std::optional<uint64_t> end_object, MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
MoqtSubscribeParameters parameters),
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
index 1ef2167..f067f31 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
@@ -54,8 +54,8 @@
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
session_.SubscribeCurrentObject(TrackName(), &visitor,
MoqtSubscribeParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(0, 0), _, _,
- "test", _));
+ EXPECT_CALL(visitor,
+ OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
session_.Unsubscribe(TrackName());
@@ -69,9 +69,9 @@
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
session_.SubscribeAbsolute(TrackName(), 1, 0, 1, &visitor,
MoqtSubscribeParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(1, 0), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 0), _,
MoqtObjectStatus::kNormal, "b", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(1, 1), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 1), _,
MoqtObjectStatus::kEndOfGroup, "", _));
track_->AddObject(quic::test::MemSliceFromString("a"), /*key=*/true);
track_->AddObject(quic::test::MemSliceFromString("b"), /*key=*/true);
@@ -89,7 +89,7 @@
[&](std::unique_ptr<MoqtFetchTask> new_fetch) {
fetch = std::move(new_fetch);
},
- FullSequence(0, 1), 0, 2, 0x80, std::nullopt, MoqtSubscribeParameters());
+ Location(0, 1), 0, 2, 0x80, std::nullopt, MoqtSubscribeParameters());
PublishedObject object;
ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess);
EXPECT_EQ(object.payload.AsStringView(), "b");
@@ -106,17 +106,17 @@
testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor;
EXPECT_CALL(visitor,
- OnReply(TrackName(), Eq(FullSequence(3, 0)), Eq(std::nullopt)));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(2, 0), _,
+ OnReply(TrackName(), Eq(Location(3, 0)), Eq(std::nullopt)));
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 0), _,
MoqtObjectStatus::kNormal, "c", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(2, 1), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(2, 1), _,
MoqtObjectStatus::kEndOfGroup, "", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(3, 0), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 0), _,
MoqtObjectStatus::kNormal, "d", _));
session_.JoiningFetch(TrackName(), &visitor, 2, MoqtSubscribeParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(3, 1), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 1), _,
MoqtObjectStatus::kEndOfGroup, "", _));
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(4, 0), _,
+ EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(4, 0), _,
MoqtObjectStatus::kNormal, "e", _));
track_->AddObject(quic::test::MemSliceFromString("e"), /*key=*/true);
}
@@ -126,8 +126,8 @@
EXPECT_CALL(visitor,
OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt)));
session_.JoiningFetch(TrackName(), &visitor, 0, MoqtSubscribeParameters());
- EXPECT_CALL(visitor, OnObjectFragment(TrackName(), FullSequence(0, 0), _, _,
- "test", _));
+ EXPECT_CALL(visitor,
+ OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _));
track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
}
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index afcde1e..a1f4b52 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -103,7 +103,7 @@
subscribe.full_track_name = publisher->GetTrackName();
subscribe.track_alias = track_alias;
subscribe.subscribe_id = subscribe_id;
- subscribe.start = FullSequence(start_group, start_object);
+ subscribe.start = Location(start_group, start_object);
subscribe.subscriber_priority = 0x80;
session->published_subscriptions_.emplace(
subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
@@ -113,7 +113,7 @@
}
static bool InSubscriptionWindow(MoqtObjectListener* subscription,
- FullSequence sequence) {
+ Location sequence) {
return static_cast<MoqtSession::PublishedSubscription*>(subscription)
->InWindow(sequence);
}
@@ -175,8 +175,8 @@
session->ValidateSubscribeId(id);
}
- static FullSequence LargestSentForSubscription(MoqtSession* session,
- uint64_t subscribe_id) {
+ static Location LargestSentForSubscription(MoqtSession* session,
+ uint64_t subscribe_id) {
return *session->published_subscriptions_[subscribe_id]->largest_sent();
}
@@ -189,7 +189,7 @@
std::nullopt,
std::nullopt,
FullTrackName{"foo", "bar"},
- FullSequence{0, 0},
+ Location{0, 0},
4,
std::nullopt,
MoqtSubscribeParameters(),
@@ -204,7 +204,7 @@
UpstreamFetch* fetch = static_cast<UpstreamFetch*>(it->second.get());
// Initialize the fetch task
fetch->OnFetchResult(
- FullSequence{4, 10}, absl::OkStatus(),
+ Location{4, 10}, absl::OkStatus(),
[=, session_ptr = session, fetch_id = fetch_message.fetch_id]() {
session_ptr->CancelFetch(fetch_id);
});
@@ -252,7 +252,7 @@
}
static bool SubgroupHasBeenReset(MoqtObjectListener* subscription,
- FullSequence sequence) {
+ Location sequence) {
sequence.object = 0;
return static_cast<MoqtSession::PublishedSubscription*>(subscription)
->reset_subgroups()
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 5bea109..180ee7a 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -553,7 +553,7 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/MoqtDeliveryOrder::kDescending,
- /*start=*/FullSequence(4, 1),
+ /*start=*/Location(4, 1),
/*end_group=*/std::nullopt,
/*parameters=*/
MoqtSubscribeParameters{
@@ -623,7 +623,7 @@
/*subscribe_id=*/1,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
/*group_order=*/MoqtDeliveryOrder::kDescending,
- /*largest_id=*/FullSequence(12, 20),
+ /*largest_id=*/Location(12, 20),
/*parameters=*/
MoqtSubscribeParameters{
std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000),
@@ -809,7 +809,7 @@
MoqtSubscribeUpdate subscribe_update_ = {
/*subscribe_id=*/2,
- /*start=*/FullSequence(3, 1),
+ /*start=*/Location(3, 1),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
/*parameters=*/
@@ -1400,7 +1400,7 @@
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*joining_fetch=*/std::optional<JoiningFetch>(),
/*full_track_name=*/FullTrackName{"foo", "bar"},
- /*start_object=*/FullSequence{1, 2},
+ /*start_object=*/Location{1, 2},
/*end_group=*/5,
/*end_object=*/6,
/*parameters=*/
@@ -1500,7 +1500,7 @@
/*joining_fetch=*/JoiningFetch{2, 2},
/* the next four are ignored for joining fetches*/
/*full_track_name=*/FullTrackName{"foo", "bar"},
- /*start_object=*/FullSequence{1, 2},
+ /*start_object=*/Location{1, 2},
/*end_group=*/5,
/*end_object=*/6,
/*parameters=*/
@@ -1583,7 +1583,7 @@
MoqtFetchOk fetch_ok_ = {
/*subscribe_id =*/1,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*start_object=*/FullSequence{5, 4},
+ /*start_object=*/Location{5, 4},
/*parameters=*/
MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
};
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 1a4c558..87d17ca 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -165,7 +165,7 @@
void ChatClient::RemoteTrackVisitor::OnReply(
const FullTrackName& full_track_name,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> reason_phrase) {
auto it = client_->other_users_.find(full_track_name);
if (it == client_->other_users_.end()) {
@@ -184,7 +184,7 @@
}
void ChatClient::RemoteTrackVisitor::OnObjectFragment(
- const FullTrackName& full_track_name, FullSequence /*sequence*/,
+ const FullTrackName& full_track_name, Location /*sequence*/,
MoqtPriority /*publisher_priority*/, MoqtObjectStatus /*status*/,
absl::string_view object, bool end_of_message) {
if (!end_of_message) {
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h
index 046bfeb..5507824 100644
--- a/quiche/quic/moqt/tools/chat_client.h
+++ b/quiche/quic/moqt/tools/chat_client.h
@@ -92,13 +92,13 @@
RemoteTrackVisitor(ChatClient* client) : client_(client) {}
void OnReply(const moqt::FullTrackName& full_track_name,
- std::optional<FullSequence> largest_id,
+ std::optional<Location> largest_id,
std::optional<absl::string_view> reason_phrase) override;
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const moqt::FullTrackName& full_track_name,
- FullSequence sequence,
+ Location sequence,
moqt::MoqtPriority publisher_priority,
moqt::MoqtObjectStatus status,
absl::string_view object,
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index cae1264..3fae683 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -134,7 +134,7 @@
void ChatServer::RemoteTrackVisitor::OnReply(
const moqt::FullTrackName& full_track_name,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> reason_phrase) {
std::cout << "Subscription to " << full_track_name.ToString();
if (reason_phrase.has_value()) {
@@ -146,7 +146,7 @@
}
void ChatServer::RemoteTrackVisitor::OnObjectFragment(
- const moqt::FullTrackName& full_track_name, moqt::FullSequence sequence,
+ const moqt::FullTrackName& full_track_name, moqt::Location sequence,
moqt::MoqtPriority /*publisher_priority*/, moqt::MoqtObjectStatus status,
absl::string_view object, bool end_of_message) {
if (!end_of_message) {
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index d9cdc70..6e0dd7e 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -39,14 +39,15 @@
public:
explicit RemoteTrackVisitor(ChatServer* server);
void OnReply(const moqt::FullTrackName& full_track_name,
- std::optional<FullSequence> largest_id,
+ std::optional<Location> largest_id,
std::optional<absl::string_view> reason_phrase) override;
void OnCanAckObjects(MoqtObjectAckFunction) override {}
- void OnObjectFragment(
- const moqt::FullTrackName& full_track_name, FullSequence sequence,
- moqt::MoqtPriority /*publisher_priority*/,
- moqt::MoqtObjectStatus /*status*/,
- absl::string_view object, bool end_of_message) override;
+ void OnObjectFragment(const moqt::FullTrackName& full_track_name,
+ Location sequence,
+ moqt::MoqtPriority /*publisher_priority*/,
+ moqt::MoqtObjectStatus /*status*/,
+ absl::string_view object,
+ bool end_of_message) override;
void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
private:
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index 2bb5616..85de291 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -169,7 +169,7 @@
void OnReply(
const FullTrackName& full_track_name,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> error_reason_phrase) override {
if (error_reason_phrase.has_value()) {
QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track "
@@ -180,10 +180,9 @@
void OnCanAckObjects(MoqtObjectAckFunction) override {}
void OnObjectFragment(const FullTrackName& full_track_name,
- FullSequence sequence,
+ Location sequence,
MoqtPriority /*publisher_priority*/,
- MoqtObjectStatus /*status*/,
- absl::string_view object,
+ MoqtObjectStatus /*status*/, absl::string_view object,
bool /*end_of_message*/) override {
std::string file_name = absl::StrCat(sequence.group, "-", sequence.object,
".", full_track_name.tuple().back());
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 6cb7007..9d1f1e7 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -64,23 +64,22 @@
const FullTrackName& GetTrackName() const override { return track_name_; }
MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
- (FullSequence sequence), (const, override));
- MOCK_METHOD(std::vector<FullSequence>, GetCachedObjectsInRange,
- (FullSequence start, FullSequence end), (const, override));
+ (Location sequence), (const, override));
+ MOCK_METHOD(std::vector<Location>, GetCachedObjectsInRange,
+ (Location start, Location end), (const, override));
MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
(override));
MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
(override));
MOCK_METHOD(absl::StatusOr<MoqtTrackStatusCode>, GetTrackStatus, (),
(const, override));
- MOCK_METHOD(FullSequence, GetLargestSequence, (), (const, override));
+ MOCK_METHOD(Location, GetLargestSequence, (), (const, override));
MOCK_METHOD(MoqtForwardingPreference, GetForwardingPreference, (),
(const, override));
MOCK_METHOD(MoqtPriority, GetPublisherPriority, (), (const, override));
MOCK_METHOD(MoqtDeliveryOrder, GetDeliveryOrder, (), (const, override));
MOCK_METHOD(std::unique_ptr<MoqtFetchTask>, Fetch,
- (FullSequence, uint64_t, std::optional<uint64_t>,
- MoqtDeliveryOrder),
+ (Location, uint64_t, std::optional<uint64_t>, MoqtDeliveryOrder),
(override));
private:
@@ -91,13 +90,13 @@
public:
MOCK_METHOD(void, OnReply,
(const FullTrackName& full_track_name,
- std::optional<FullSequence> largest_id,
+ std::optional<Location> largest_id,
std::optional<absl::string_view> error_reason_phrase),
(override));
MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function),
(override));
MOCK_METHOD(void, OnObjectFragment,
- (const FullTrackName& full_track_name, FullSequence sequence,
+ (const FullTrackName& full_track_name, Location sequence,
MoqtPriority publisher_priority, MoqtObjectStatus status,
absl::string_view object, bool end_of_message),
(override));
@@ -119,7 +118,7 @@
MOCK_METHOD(MoqtFetchTask::GetNextObjectResult, GetNextObject,
(PublishedObject & output), (override));
MOCK_METHOD(absl::Status, GetStatus, (), (override));
- MOCK_METHOD(FullSequence, GetLargestId, (), (const, override));
+ MOCK_METHOD(Location, GetLargestId, (), (const, override));
void SetObjectAvailableCallback(ObjectsAvailableCallback callback) override {
objects_available_callback_ = std::move(callback);
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 523cc65..59f8ee9 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -289,7 +289,7 @@
: clock_(clock), deadline_(deadline) {}
void OnReply(const FullTrackName& full_track_name,
- std::optional<FullSequence> /*largest_id*/,
+ std::optional<Location> /*largest_id*/,
std::optional<absl::string_view> error_reason_phrase) override {
QUICHE_CHECK(full_track_name == TrackName());
QUICHE_CHECK(!error_reason_phrase.has_value()) << *error_reason_phrase;
@@ -299,8 +299,7 @@
object_ack_function_ = std::move(ack_function);
}
- void OnObjectFragment(const FullTrackName& full_track_name,
- FullSequence sequence,
+ void OnObjectFragment(const FullTrackName& full_track_name, Location sequence,
MoqtPriority /*publisher_priority*/,
MoqtObjectStatus status, absl::string_view object,
bool end_of_message) override {
@@ -318,7 +317,7 @@
void OnSubscribeDone(FullTrackName /*full_track_name*/) override {}
- void OnFullObject(FullSequence sequence, absl::string_view payload) {
+ void OnFullObject(Location sequence, absl::string_view payload) {
QUICHE_CHECK_GE(payload.size(), 8u);
quiche::QuicheDataReader reader(payload);
uint64_t time_us;
@@ -354,7 +353,7 @@
private:
const QuicClock* clock_ = nullptr;
// TODO: figure out when partial objects should be discarded.
- absl::flat_hash_map<FullSequence, std::string> partial_objects_;
+ absl::flat_hash_map<Location, std::string> partial_objects_;
MoqtObjectAckFunction object_ack_function_ = nullptr;
size_t full_objects_received_ = 0;