Eliminate LatestGroup filter from MoQT Subscribe. This is the last bit to match draft-10.
Simplified the Subscribe message struct to delete many lines of code.
PiperOrigin-RevId: 741261806
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index e086460..b59bc5f 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -440,7 +440,6 @@
return quiche::QuicheBuffer();
}
switch (filter_type) {
- case MoqtFilterType::kLatestGroup:
case MoqtFilterType::kLatestObject:
return SerializeControlMessage(
MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
@@ -456,8 +455,8 @@
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireVarInt62(*message.start_group),
- WireVarInt62(*message.start_object),
+ WireVarInt62(message.start->group),
+ WireVarInt62(message.start->object),
WireSubscribeParameterList(message.parameters));
case MoqtFilterType::kAbsoluteRange:
return SerializeControlMessage(
@@ -466,8 +465,8 @@
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireVarInt62(*message.start_group),
- WireVarInt62(*message.start_object), WireVarInt62(*message.end_group),
+ WireVarInt62(message.start->group),
+ WireVarInt62(message.start->object), WireVarInt62(*message.end_group),
WireSubscribeParameterList(message.parameters));
default:
QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error.";
@@ -530,7 +529,7 @@
message.end_group.has_value() ? *message.end_group + 1 : 0;
return SerializeControlMessage(
MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.subscribe_id),
- WireVarInt62(message.start_group), WireVarInt62(message.start_object),
+ WireVarInt62(message.start.group), WireVarInt62(message.start.object),
WireVarInt62(end_group), WireUint8(message.subscriber_priority),
WireSubscribeParameterList(message.parameters));
}
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 7a347f5..05cc3dd 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -9,6 +9,7 @@
#include <memory>
#include <optional>
#include <string>
+#include <utility>
#include <vector>
#include "absl/strings/str_cat.h"
@@ -401,38 +402,35 @@
}
TEST_F(MoqtFramerSimpleTest, AllSubscribeInputs) {
- for (std::optional<uint64_t> start_group :
- {std::optional<uint64_t>(), std::optional<uint64_t>(4)}) {
- for (std::optional<uint64_t> start_object :
- {std::optional<uint64_t>(), std::optional<uint64_t>(0)}) {
- for (std::optional<uint64_t> end_group :
- {std::optional<uint64_t>(), std::optional<uint64_t>(7)}) {
- MoqtSubscribe subscribe = {
- /*subscribe_id=*/3,
- /*track_alias=*/4,
- /*full_track_name=*/FullTrackName({"foo", "abcd"}),
- /*subscriber_priority=*/0x20,
- /*group_order=*/std::nullopt,
- start_group,
- start_object,
- end_group,
- MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt,
- std::nullopt},
- };
- quiche::QuicheBuffer buffer;
- MoqtFilterType expected_filter_type = GetFilterType(subscribe);
- if (expected_filter_type == MoqtFilterType::kNone) {
- EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
- "Invalid object range");
- EXPECT_EQ(buffer.size(), 0);
- continue;
- }
- buffer = framer_.SerializeSubscribe(subscribe);
- // Go to the filter type.
- const uint8_t* read = BufferAtOffset(buffer, 16);
- EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type);
- EXPECT_GT(buffer.size(), 0);
+ for (std::optional<FullSequence> start :
+ {std::optional<FullSequence>(),
+ std::optional<FullSequence>(std::in_place, 4, 0)}) {
+ for (std::optional<uint64_t> end_group :
+ {std::optional<uint64_t>(), std::optional<uint64_t>(7)}) {
+ MoqtSubscribe subscribe = {
+ /*subscribe_id=*/3,
+ /*track_alias=*/4,
+ /*full_track_name=*/FullTrackName({"foo", "abcd"}),
+ /*subscriber_priority=*/0x20,
+ /*group_order=*/std::nullopt,
+ start,
+ end_group,
+ MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt,
+ std::nullopt},
+ };
+ quiche::QuicheBuffer buffer;
+ MoqtFilterType expected_filter_type = GetFilterType(subscribe);
+ if (expected_filter_type == MoqtFilterType::kNone) {
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
+ "Invalid object range");
+ EXPECT_EQ(buffer.size(), 0);
+ continue;
}
+ buffer = framer_.SerializeSubscribe(subscribe);
+ // Go to the filter type.
+ const uint8_t* read = BufferAtOffset(buffer, 16);
+ EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type);
+ EXPECT_GT(buffer.size(), 0);
}
}
}
@@ -444,9 +442,8 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/std::nullopt,
- /*start_group=*/std::optional<uint64_t>(4),
- /*start_object=*/std::optional<uint64_t>(3),
- /*end_group=*/std::optional<uint64_t>(3),
+ /*start=*/FullSequence(4, 3),
+ /*end_group=*/3,
MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt},
};
quiche::QuicheBuffer buffer;
@@ -479,29 +476,10 @@
EXPECT_EQ(buffer.size(), 0);
}
-TEST_F(MoqtFramerSimpleTest, SubscribeLatestGroupNonzeroObject) {
- MoqtSubscribe subscribe = {
- /*subscribe_id=*/3,
- /*track_alias=*/4,
- /*full_track_name=*/FullTrackName({"foo", "abcd"}),
- /*subscriber_priority=*/0x20,
- /*group_order=*/std::nullopt,
- /*start_group=*/std::nullopt,
- /*start_object=*/std::optional<uint64_t>(3),
- /*end_group=*/std::nullopt,
- MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt},
- };
- quiche::QuicheBuffer buffer;
- EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
- "Invalid object range");
- EXPECT_EQ(buffer.size(), 0);
-}
-
TEST_F(MoqtFramerSimpleTest, SubscribeUpdateEndGroupOnly) {
MoqtSubscribeUpdate subscribe_update = {
/*subscribe_id=*/3,
- /*start_group=*/4,
- /*start_object=*/3,
+ /*start=*/FullSequence(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
@@ -517,8 +495,7 @@
TEST_F(MoqtFramerSimpleTest, SubscribeUpdateIncrementsEnd) {
MoqtSubscribeUpdate subscribe_update = {
/*subscribe_id=*/3,
- /*start_group=*/4,
- /*start_object=*/3,
+ /*start=*/FullSequence(4, 3),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt,
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index a85e212..e4e973b 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -237,8 +237,8 @@
FullTrackName track_name = track_namespace;
track_name.AddElement("/catalog");
EXPECT_FALSE(error.has_value());
- server_->session()->SubscribeCurrentGroup(track_name, &server_visitor,
- MoqtSubscribeParameters());
+ server_->session()->SubscribeCurrentObject(track_name, &server_visitor,
+ MoqtSubscribeParameters());
});
EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() {
matches = true;
@@ -318,36 +318,51 @@
auto queue = std::make_shared<MoqtOutgoingQueue>(
FullTrackName{"test", name}, forwarding_preference);
publisher.Add(queue);
+
+ // These will not be delivered.
queue->AddObject(MemSliceFromString("object 1"), /*key=*/true);
queue->AddObject(MemSliceFromString("object 2"), /*key=*/false);
queue->AddObject(MemSliceFromString("object 3"), /*key=*/false);
- queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
- queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
+ client_->session()->SubscribeCurrentObject(FullTrackName("test", name),
+ &client_visitor,
+ MoqtSubscribeParameters());
+ std::optional<FullSequence> largest_id;
+ EXPECT_CALL(client_visitor, OnReply)
+ .WillOnce([&](const FullTrackName& /*name*/,
+ std::optional<FullSequence> id,
+ std::optional<absl::string_view> /*reason*/) {
+ largest_id = id;
+ });
+ bool success = test_harness_.RunUntilWithDefaultTimeout([&]() {
+ return largest_id.has_value() && *largest_id == FullSequence(0, 2);
+ });
+ EXPECT_TRUE(success);
- client_->session()->SubscribeCurrentGroup(FullTrackName("test", name),
- &client_visitor,
- MoqtSubscribeParameters());
int received = 0;
- EXPECT_CALL(client_visitor, OnReply);
+ EXPECT_CALL(client_visitor,
+ OnObjectFragment(_, FullSequence{0, 3}, _,
+ MoqtObjectStatus::kEndOfGroup, "", true))
+ .WillOnce([&] { ++received; });
EXPECT_CALL(client_visitor,
OnObjectFragment(_, FullSequence{1, 0}, _,
MoqtObjectStatus::kNormal, "object 4", true))
.WillOnce([&] { ++received; });
+ queue->AddObject(MemSliceFromString("object 4"), /*key=*/true);
EXPECT_CALL(client_visitor,
OnObjectFragment(_, FullSequence{1, 1}, _,
MoqtObjectStatus::kNormal, "object 5", true))
.WillOnce([&] { ++received; });
- bool success = test_harness_.RunUntilWithDefaultTimeout(
- [&]() { return received >= 2; });
+ queue->AddObject(MemSliceFromString("object 5"), /*key=*/false);
+
+ success = test_harness_.RunUntilWithDefaultTimeout(
+ [&]() { return received >= 3; });
EXPECT_TRUE(success);
- queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
- queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
- queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
EXPECT_CALL(client_visitor,
OnObjectFragment(_, FullSequence{1, 2}, _,
MoqtObjectStatus::kNormal, "object 6", true))
.WillOnce([&] { ++received; });
+ queue->AddObject(MemSliceFromString("object 6"), /*key=*/false);
EXPECT_CALL(client_visitor,
OnObjectFragment(_, FullSequence{1, 3}, _,
MoqtObjectStatus::kEndOfGroup, "", true))
@@ -356,12 +371,15 @@
OnObjectFragment(_, FullSequence{2, 0}, _,
MoqtObjectStatus::kNormal, "object 7", true))
.WillOnce([&] { ++received; });
+ queue->AddObject(MemSliceFromString("object 7"), /*key=*/true);
EXPECT_CALL(client_visitor,
OnObjectFragment(_, FullSequence{2, 1}, _,
MoqtObjectStatus::kNormal, "object 8", true))
.WillOnce([&] { ++received; });
+ queue->AddObject(MemSliceFromString("object 8"), /*key=*/false);
+
success = test_harness_.RunUntilWithDefaultTimeout(
- [&]() { return received >= 6; });
+ [&]() { return received >= 7; });
EXPECT_TRUE(success);
}
}
@@ -509,8 +527,8 @@
});
EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason))
.WillOnce([&]() { received_ok = true; });
- client_->session()->SubscribeCurrentGroup(full_track_name, &client_visitor,
- MoqtSubscribeParameters());
+ client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
+ MoqtSubscribeParameters());
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
@@ -557,7 +575,7 @@
EXPECT_TRUE(success);
// Reject this subscribe because there already is one.
- EXPECT_FALSE(client_->session()->SubscribeCurrentGroup(
+ EXPECT_FALSE(client_->session()->SubscribeCurrentObject(
full_track_name, &client_visitor, MoqtSubscribeParameters()));
queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE.
bool subscribe_done = false;
@@ -569,7 +587,7 @@
EXPECT_TRUE(success);
// Subscription is deleted; the client session should not immediately reject
// a new attempt.
- EXPECT_TRUE(client_->session()->SubscribeCurrentGroup(
+ EXPECT_TRUE(client_->session()->SubscribeCurrentObject(
full_track_name, &client_visitor, MoqtSubscribeParameters()));
}
@@ -697,8 +715,8 @@
MoqtSubscribeParameters parameters;
// Set delivery timeout to ~ 1 RTT: any loss is fatal.
parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100);
- client_->session()->SubscribeCurrentGroup(full_track_name, &client_visitor,
- parameters);
+ client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
+ parameters);
bool success =
test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; });
EXPECT_TRUE(success);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index 525f185..69885e6 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -241,7 +241,7 @@
std::vector<FullSequence> MoqtLiveRelayQueue::GetCachedObjectsInRange(
FullSequence start, FullSequence end) const {
std::vector<FullSequence> sequences;
- SubscribeWindow window(start, end);
+ SubscribeWindow window(start, end.group, end.object);
for (auto& group_it : queue_) {
if (group_it.first < start.group) {
continue;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index 02bd47e..d313099 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -64,7 +64,7 @@
}
}
- void CallSubscribeForPast(const SubscribeWindow& window) {
+ void GetObjectsFromPast(const SubscribeWindow& window) {
std::vector<FullSequence> objects =
GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
for (FullSequence object : objects) {
@@ -129,7 +129,7 @@
EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+ queue.GetObjectsFromPast(SubscribeWindow());
}
TEST(MoqtLiveRelayQueue, SingleGroupPastSubscribeFromMidGroup) {
@@ -146,7 +146,7 @@
EXPECT_TRUE(queue.AddObject(FullSequence{0, 0}, "a"));
EXPECT_TRUE(queue.AddObject(FullSequence{0, 1}, "b"));
EXPECT_TRUE(queue.AddObject(FullSequence{0, 2}, "c"));
- queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
}
TEST(MoqtLiveRelayQueue, TwoGroups) {
@@ -198,7 +198,7 @@
EXPECT_TRUE(queue.AddObject(FullSequence{1, 0}, "d"));
EXPECT_TRUE(queue.AddObject(FullSequence{1, 1}, "e"));
EXPECT_TRUE(queue.AddObject(FullSequence{1, 2}, "f"));
- queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
}
TEST(MoqtLiveRelayQueue, FiveGroups) {
@@ -292,7 +292,7 @@
queue.AddObject(FullSequence{3, 2}, MoqtObjectStatus::kEndOfGroup));
EXPECT_TRUE(queue.AddObject(FullSequence{4, 0}, "i"));
EXPECT_TRUE(queue.AddObject(FullSequence{4, 1}, "j"));
- queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+ queue.GetObjectsFromPast(SubscribeWindow());
}
TEST(MoqtLiveRelayQueue, FiveGroupsPastSubscribeFromMidGroup) {
@@ -447,7 +447,7 @@
EXPECT_TRUE(queue.AddObject(FullSequence{0, 2, 7}, "f"));
EXPECT_TRUE(queue.AddFin(FullSequence{0, 1, 5}));
EXPECT_TRUE(queue.AddFin(FullSequence{0, 2, 7}));
- queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+ queue.GetObjectsFromPast(SubscribeWindow());
}
TEST(MoqtLiveRelayQueue, EndOfSubgroup) {
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 40045c6..f2e9f7e 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -30,22 +30,19 @@
}
MoqtFilterType GetFilterType(const MoqtSubscribe& message) {
- if (message.start_object.has_value()) {
- if (message.start_group.has_value()) {
- if (message.end_group.has_value()) {
- return (*message.end_group >= *message.start_group)
- ? MoqtFilterType::kAbsoluteRange
- : MoqtFilterType::kNone;
+ if (message.start.has_value()) {
+ if (message.end_group.has_value()) {
+ if (*message.end_group < message.start->group) {
+ return MoqtFilterType::kNone;
}
- return MoqtFilterType::kAbsoluteStart;
+ return MoqtFilterType::kAbsoluteRange;
}
- return *message.start_object == 0 ? MoqtFilterType::kLatestGroup
- : MoqtFilterType::kNone;
+ return MoqtFilterType::kAbsoluteStart;
}
- if (!message.start_group.has_value() && !message.end_group.has_value()) {
- return MoqtFilterType::kLatestObject;
+ if (message.end_group.has_value()) {
+ return MoqtFilterType::kNone; // End group without start is invalid.
}
- return MoqtFilterType::kNone;
+ return MoqtFilterType::kLatestObject;
}
std::string MoqtMessageTypeToString(const MoqtMessageType message_type) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 070e415..084b96f 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -355,7 +355,6 @@
enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
kNone = 0x0,
- kLatestGroup = 0x1,
kLatestObject = 0x2,
kAbsoluteStart = 0x3,
kAbsoluteRange = 0x4,
@@ -388,14 +387,11 @@
std::optional<MoqtDeliveryOrder> group_order;
// The combinations of these that have values indicate the filter type.
- // SG: Start Group; SO: Start Object; EG: End Group;
// (none): KLatestObject
- // SO: kLatestGroup (must be zero)
- // SG, SO: kAbsoluteStart
- // SG, SO, EG: kAbsoluteRange (request whole last group)
+ // start: kAbsoluteStart
+ // start, end_group: kAbsoluteRange (request whole last group)
// All other combinations are invalid.
- std::optional<uint64_t> start_group;
- std::optional<uint64_t> start_object;
+ std::optional<FullSequence> start;
std::optional<uint64_t> end_group;
// If the mode is kNone, the these are std::nullopt.
@@ -446,8 +442,7 @@
struct QUICHE_EXPORT MoqtSubscribeUpdate {
uint64_t subscribe_id;
- uint64_t start_group;
- uint64_t start_object;
+ FullSequence start;
std::optional<uint64_t> end_group;
MoqtPriority subscriber_priority;
MoqtSubscribeParameters parameters;
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index 9246a70..2c05a70 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -89,7 +89,7 @@
std::vector<FullSequence> MoqtOutgoingQueue::GetCachedObjectsInRange(
FullSequence start, FullSequence end) const {
std::vector<FullSequence> sequences;
- SubscribeWindow window(start, end);
+ SubscribeWindow window(start, end.group, end.object);
for (const Group& group : queue_) {
for (const CachedObject& object : group) {
if (window.InWindow(object.sequence)) {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 81a66dd..92d0d2e 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -59,7 +59,7 @@
}
}
- void CallSubscribeForPast(const SubscribeWindow& window) {
+ void GetObjectsFromPast(const SubscribeWindow& window) {
std::vector<FullSequence> objects =
GetCachedObjectsInRange(FullSequence(0, 0), GetLargestSequence());
for (FullSequence object : objects) {
@@ -146,7 +146,7 @@
queue.AddObject(MemSliceFromString("a"), true);
queue.AddObject(MemSliceFromString("b"), false);
queue.AddObject(MemSliceFromString("c"), false);
- queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 0)));
}
TEST(MoqtOutgoingQueue, SingleGroupPastSubscribeFromMidGroup) {
@@ -163,7 +163,7 @@
queue.AddObject(MemSliceFromString("a"), true);
queue.AddObject(MemSliceFromString("b"), false);
queue.AddObject(MemSliceFromString("c"), false);
- queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
}
TEST(MoqtOutgoingQueue, TwoGroups) {
@@ -211,7 +211,7 @@
queue.AddObject(MemSliceFromString("d"), true);
queue.AddObject(MemSliceFromString("e"), false);
queue.AddObject(MemSliceFromString("f"), false);
- queue.CallSubscribeForPast(SubscribeWindow(0, 1));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 1)));
}
TEST(MoqtOutgoingQueue, FiveGroups) {
@@ -284,7 +284,7 @@
queue.AddObject(MemSliceFromString("h"), false);
queue.AddObject(MemSliceFromString("i"), true);
queue.AddObject(MemSliceFromString("j"), false);
- queue.CallSubscribeForPast(SubscribeWindow(0, 0));
+ queue.GetObjectsFromPast(SubscribeWindow(FullSequence(0, 0)));
}
TEST(MoqtOutgoingQueue, Fetch) {
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 6a79a96..71fd8d5 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -379,9 +379,6 @@
}
MoqtFilterType filter_type = static_cast<MoqtFilterType>(filter);
switch (filter_type) {
- case MoqtFilterType::kLatestGroup:
- subscribe_request.start_object = 0;
- break;
case MoqtFilterType::kLatestObject:
break;
case MoqtFilterType::kAbsoluteStart:
@@ -389,8 +386,7 @@
if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) {
return 0;
}
- subscribe_request.start_group = group;
- subscribe_request.start_object = object;
+ subscribe_request.start = FullSequence(group, object);
if (filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
@@ -398,7 +394,7 @@
return 0;
}
subscribe_request.end_group = group;
- if (subscribe_request.end_group < subscribe_request.start_group) {
+ if (*subscribe_request.end_group < subscribe_request.start->group) {
ParseError("End group is less than start group");
return 0;
}
@@ -491,20 +487,20 @@
size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) {
MoqtSubscribeUpdate subscribe_update;
- uint64_t end_group;
+ uint64_t start_group, start_object, end_group;
if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) ||
- !reader.ReadVarInt62(&subscribe_update.start_group) ||
- !reader.ReadVarInt62(&subscribe_update.start_object) ||
- !reader.ReadVarInt62(&end_group) ||
+ !reader.ReadVarInt62(&start_group) ||
+ !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) ||
!reader.ReadUInt8(&subscribe_update.subscriber_priority)) {
return 0;
}
if (!ReadSubscribeParameters(reader, subscribe_update.parameters)) {
return 0;
}
+ subscribe_update.start = FullSequence(start_group, start_object);
if (end_group > 0) {
subscribe_update.end_group = end_group - 1;
- if (subscribe_update.end_group < subscribe_update.start_group) {
+ if (subscribe_update.end_group < start_group) {
ParseError("End group is less than start group");
return 0;
}
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index cbca5fc..bec4458 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -887,27 +887,6 @@
EXPECT_EQ(*visitor_.parsing_error_, "Unknown message type");
}
-TEST_F(MoqtMessageSpecificTest, LatestGroup) {
- MoqtControlParser parser(kRawQuic, visitor_);
- char subscribe[] = {
- 0x03, 0x15, 0x01, 0x02, // id and alias
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
- 0x01, // filter_type = kLatestGroup
- 0x01, // 1 parameter
- 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
- };
- parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
- EXPECT_EQ(visitor_.messages_received_, 1);
- ASSERT_TRUE(visitor_.last_message_.has_value());
- MoqtSubscribe message =
- std::get<MoqtSubscribe>(visitor_.last_message_.value());
- EXPECT_FALSE(message.start_group.has_value());
- EXPECT_EQ(message.start_object, 0);
- EXPECT_FALSE(message.end_group.has_value());
-}
-
TEST_F(MoqtMessageSpecificTest, LatestObject) {
MoqtControlParser parser(kRawQuic, visitor_);
char subscribe[] = {
@@ -924,8 +903,7 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
- EXPECT_FALSE(message.start_group.has_value());
- EXPECT_FALSE(message.start_object.has_value());
+ EXPECT_FALSE(message.start.has_value());
EXPECT_FALSE(message.end_group.has_value());
}
@@ -963,8 +941,8 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
- EXPECT_EQ(message.start_group.value(), 4);
- EXPECT_EQ(message.start_object.value(), 1);
+ EXPECT_TRUE(message.start.has_value() && message.start->group == 4);
+ EXPECT_TRUE(message.start.has_value() && message.start->object == 1);
EXPECT_FALSE(message.end_group.has_value());
}
@@ -987,8 +965,8 @@
EXPECT_FALSE(visitor_.parsing_error_.has_value());
MoqtSubscribe message =
std::get<MoqtSubscribe>(visitor_.last_message_.value());
- EXPECT_EQ(message.start_group.value(), 4);
- EXPECT_EQ(message.start_object.value(), 1);
+ EXPECT_TRUE(message.start.has_value() && message.start->group == 4);
+ EXPECT_TRUE(message.start.has_value() && message.start->object == 1);
EXPECT_EQ(message.end_group.value(), 7);
}
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 9acedb2..48dd8f0 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -69,9 +69,8 @@
}
SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) {
- return SubscribeWindow(subscribe.start_group.value_or(0),
- subscribe.start_object.value_or(0),
- subscribe.end_group.value_or(UINT64_MAX), UINT64_MAX);
+ return SubscribeWindow(subscribe.start.value_or(FullSequence(0, 0)),
+ subscribe.end_group);
}
class DefaultPublisher : public MoqtPublisher {
@@ -329,8 +328,7 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
- message.start_group = start_group;
- message.start_object = start_object;
+ message.start = FullSequence(start_group, start_object);
message.end_group = std::nullopt;
message.parameters = std::move(parameters);
return Subscribe(message, visitor);
@@ -349,8 +347,7 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
- message.start_group = start_group;
- message.start_object = start_object;
+ message.start = FullSequence(start_group, start_object);
message.end_group = end_group;
message.parameters = std::move(parameters);
return Subscribe(message, visitor);
@@ -363,23 +360,7 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
- message.start_group = std::nullopt;
- message.start_object = std::nullopt;
- message.end_group = std::nullopt;
- message.parameters = std::move(parameters);
- return Subscribe(message, visitor);
-}
-
-bool MoqtSession::SubscribeCurrentGroup(const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) {
- MoqtSubscribe message;
- message.full_track_name = name;
- message.subscriber_priority = kDefaultSubscriberPriority;
- message.group_order = std::nullopt;
- // First object of current group.
- message.start_group = std::nullopt;
- message.start_object = 0;
+ message.start = std::nullopt;
message.end_group = std::nullopt;
message.parameters = std::move(parameters);
return Subscribe(message, visitor);
@@ -476,8 +457,7 @@
subscribe.subscriber_priority = priority;
subscribe.group_order = delivery_order;
// Must be "Current Object" filter.
- subscribe.start_group = std::nullopt;
- subscribe.start_object = std::nullopt;
+ subscribe.start = std::nullopt;
subscribe.end_group = std::nullopt;
subscribe.parameters = parameters;
if (!Subscribe(subscribe, visitor, std::nullopt)) {
@@ -1032,8 +1012,9 @@
SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
subscribe->OnObjectOrOk();
// TODO(martinduke): Handle expires field.
- // TODO(martinduke): Resize the window based on largest_id.
- // TODO(martinduke): Handle delivery_timeout parameter.
+ if (message.largest_id.has_value()) {
+ subscribe->TruncateStart(message.largest_id->next());
+ }
if (subscribe->visitor() != nullptr) {
subscribe->visitor()->OnReply(track->full_track_name(), message.largest_id,
std::nullopt);
@@ -1116,12 +1097,8 @@
if (it == session_->published_subscriptions_.end()) {
return;
}
- FullSequence start(message.start_group, message.start_object);
- std::optional<FullSequence> end;
- if (message.end_group.has_value()) {
- end = FullSequence(*message.end_group, UINT64_MAX);
- }
- it->second->Update(start, end, message.subscriber_priority);
+ it->second->Update(message.start, message.end_group,
+ message.subscriber_priority);
if (message.parameters.delivery_timeout.has_value()) {
it->second->set_delivery_timeout(*message.parameters.delivery_timeout);
}
@@ -1753,9 +1730,12 @@
}
void MoqtSession::PublishedSubscription::Update(
- FullSequence start, std::optional<FullSequence> end,
+ FullSequence start, std::optional<uint64_t> end_group,
MoqtPriority subscriber_priority) {
- window_.UpdateStartEnd(start, end);
+ window_.TruncateStart(start);
+ if (end_group.has_value()) {
+ window_.TruncateEnd(*end_group);
+ }
subscriber_priority_ = subscriber_priority;
// TODO: update priority of all data streams that are currently open.
@@ -1785,7 +1765,7 @@
ControlStream* stream = session_->GetControlStream();
if (PublisherHasData(*track_publisher_)) {
largest_id = track_publisher_->GetLargestSequence();
- if (window_.end().has_value() && *window_.end() < *largest_id) {
+ if (window_.end() < *largest_id) {
stream->SendSubscribeError(subscription_id_,
SubscribeErrorCode::kInvalidRange,
"SUBSCRIBE ends in past group", track_alias_);
@@ -1793,11 +1773,9 @@
// No class access below this line!
return;
}
- if (filter_type_ == MoqtFilterType::kLatestGroup) {
- window_.UpdateStartEnd(FullSequence{largest_id->group, 0}, window_.end());
- } else {
- window_.UpdateStartEnd(largest_id->next(), window_.end());
- }
+ if (!window_.TruncateStart(largest_id->next())) {
+ QUICHE_NOTREACHED();
+ };
}
MoqtSubscribeOk subscribe_ok;
@@ -1808,9 +1786,6 @@
// publisher.
stream->SendOrBufferMessage(
session_->framer_.SerializeSubscribeOk(subscribe_ok));
- if (largest_id.has_value()) {
- Backfill();
- }
}
void MoqtSession::PublishedSubscription::OnSubscribeRejected(
@@ -1950,28 +1925,6 @@
});
}
-void MoqtSession::PublishedSubscription::Backfill() {
- const FullSequence start = window_.start();
- const FullSequence end = track_publisher_->GetLargestSequence();
- const MoqtForwardingPreference preference =
- track_publisher_->GetForwardingPreference();
-
- absl::flat_hash_set<ReducedSequenceIndex> already_opened;
- std::vector<FullSequence> objects =
- track_publisher_->GetCachedObjectsInRange(start, end);
- QUICHE_DCHECK(absl::c_is_sorted(objects));
- for (FullSequence sequence : objects) {
- auto [it, was_missing] =
- already_opened.insert(ReducedSequenceIndex(sequence, preference));
- if (!was_missing) {
- // For every stream mapping unit present, we only need to notify of the
- // earliest object on it, since the stream itself will pull the rest.
- continue;
- }
- OnNewObjectAvailable(sequence);
- }
-}
-
std::vector<webtransport::StreamId>
MoqtSession::PublishedSubscription::GetAllStreams() const {
if (!lazily_initialized_stream_map_.has_value()) {
@@ -2171,7 +2124,7 @@
<< "Datagram Track requesting SendObjects";
return;
}
- next_object_.object = object->sequence.object + 1;
+ next_object_ = object->sequence.next();
if (session_->WriteObjectToStream(
stream_, subscription.track_alias(), *object,
MoqtDataStreamType::kStreamHeaderSubgroup, !stream_header_written_,
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 0d0bdfd..465d293 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -125,9 +125,6 @@
bool SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
MoqtSubscribeParameters parameters) override;
- bool SubscribeCurrentGroup(const FullTrackName& name,
- SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) override;
// Returns false if the subscription is not found. The session immediately
// destroys all subscription state.
void Unsubscribe(const FullTrackName& name);
@@ -371,14 +368,8 @@
message.group_id, message.object_id, message.delta_from_deadline);
}
- // Creates streams for all objects that are currently in the track's object
- // cache and match the subscription window. This is in some sense similar
- // to a fetch (since all of the objects are in the past), but is
- // conceptually simpler, as backpressure is less of a concern.
- void Backfill();
-
// Updates the window and other properties of the subscription in question.
- void Update(FullSequence start, std::optional<FullSequence> end,
+ void Update(FullSequence start, std::optional<uint64_t> end,
MoqtPriority subscriber_priority);
// Checks if the specified sequence is within the window of this
// subscription.
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
index a814bae..08080f1 100644
--- a/quiche/quic/moqt/moqt_session_interface.h
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -61,10 +61,6 @@
virtual bool SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
MoqtSubscribeParameters parameters) = 0;
- // TODO(vasilvv): remove.
- [[deprecated]] virtual bool SubscribeCurrentGroup(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) = 0;
// Sends an UNSUBSCRIBE message and removes all of the state related to the
// subscription. Returns false if the subscription is not found.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 8242ee1..294267a 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -64,8 +64,7 @@
kDefaultTrackName(),
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
- /*start_group=*/0,
- /*start_object=*/0,
+ /*start=*/FullSequence(0, 0),
/*end_group=*/std::nullopt,
/*parameters=*/MoqtSubscribeParameters(),
};
@@ -534,7 +533,7 @@
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
request.subscribe_id = 2;
- request.start_group = 12;
+ request.start = FullSequence(12, 0);
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
"Duplicate subscribe for track"))
@@ -558,7 +557,7 @@
// Subscribe again, succeeds.
request.subscribe_id = 2;
- request.start_group = 12;
+ request.start = FullSequence(12, 0);
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -604,20 +603,20 @@
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
- EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_CALL(
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
.Times(1);
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
// Second time does not send SUBSCRIBES_BLOCKED.
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, SubscribeDuplicateTrackName) {
@@ -628,12 +627,12 @@
.WillRepeatedly(Return(&mock_stream_));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
- EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
&remote_track_visitor,
MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, SubscribeWithOk) {
@@ -643,9 +642,9 @@
EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
- session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters());
+ session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters());
MoqtSubscribeOk ok = {
/*subscribe_id=*/0,
@@ -672,9 +671,9 @@
EXPECT_CALL(
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
MoqtMaxSubscribeId max_subscribe_id = {
/*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
};
@@ -682,9 +681,9 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
- EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
}
TEST_F(MoqtSessionTest, LowerMaxSubscribeIdIsAnError) {
@@ -722,9 +721,9 @@
EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
- session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters());
+ session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters());
MoqtSubscribeError error = {
/*subscribe_id=*/0,
@@ -1768,7 +1767,7 @@
MockSubscribeRemoteTrackVisitor visitor_;
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
- subscribe.start_group = 1;
+ subscribe.start = FullSequence(1, 0);
MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
@@ -1791,7 +1790,7 @@
MockSubscribeRemoteTrackVisitor visitor_;
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
- subscribe.start_group = 1;
+ subscribe.start = FullSequence(1, 0);
MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x80, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
@@ -2261,8 +2260,7 @@
TEST_F(MoqtSessionTest, IncomingJoiningFetch) {
MoqtSubscribe subscribe = DefaultSubscribe();
// Give it the latest object filter.
- subscribe.start_group = std::nullopt;
- subscribe.start_object = std::nullopt;
+ subscribe.start = std::nullopt;
subscribe.end_group = std::nullopt;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
@@ -2333,8 +2331,7 @@
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/0x80,
/*group_order=*/MoqtDeliveryOrder::kAscending,
- /*start_group=*/std::nullopt,
- /*start_object=*/std::nullopt,
+ /*start=*/std::nullopt,
/*end_group=*/std::nullopt,
};
MoqtFetch expected_fetch = {
@@ -2962,9 +2959,9 @@
// New requests not allowed.
EXPECT_CALL(mock_stream_, Writev).Times(0);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
std::optional<SubscribeErrorCode> /*error*/,
@@ -3019,9 +3016,9 @@
// Block all outgoing SUBSCRIBE, ANNOUNCE, GOAWAY,etc.
EXPECT_CALL(mock_stream_, Writev).Times(0);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
- EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
- &remote_track_visitor,
- MoqtSubscribeParameters()));
+ EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"),
+ &remote_track_visitor,
+ MoqtSubscribeParameters()));
EXPECT_FALSE(session_.SubscribeAnnounces(
FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/,
std::optional<SubscribeErrorCode> /*error*/,
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 83a776c..36d8518 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -15,13 +15,6 @@
namespace moqt {
-bool SubscribeWindow::InWindow(const FullSequence& seq) const {
- if (seq < start_) {
- return false;
- }
- return (!end_.has_value() || seq <= *end_);
-}
-
ReducedSequenceIndex::ReducedSequenceIndex(
FullSequence sequence, MoqtForwardingPreference preference) {
switch (preference) {
@@ -77,17 +70,27 @@
group_it->second.erase(subgroup_it);
}
-bool SubscribeWindow::UpdateStartEnd(FullSequence start,
- std::optional<FullSequence> end) {
- // Can't make the subscription window bigger.
- if (!InWindow(start)) {
- return false;
- }
- if (end_.has_value() && (!end.has_value() || *end_ < *end)) {
+bool SubscribeWindow::TruncateStart(FullSequence start) {
+ if (start < start_) {
return false;
}
start_ = start;
- end_ = end;
+ return true;
+}
+
+bool SubscribeWindow::TruncateEnd(uint64_t end_group) {
+ if (end_group > end_.group) {
+ return false;
+ }
+ end_ = FullSequence(end_group, UINT64_MAX);
+ return true;
+}
+
+bool SubscribeWindow::TruncateEnd(FullSequence largest_id) {
+ if (largest_id > end_) {
+ return false;
+ }
+ end_ = largest_id;
return true;
}
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index d6f8ea4..5ad8910 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -20,33 +20,39 @@
// can be valid.
class QUICHE_EXPORT SubscribeWindow {
public:
- // Creates a half-open window. |next_object| is the expected sequence number
- // of the next published object on the track.
- SubscribeWindow(uint64_t start_group, uint64_t start_object)
- : SubscribeWindow(FullSequence(start_group, start_object), std::nullopt) {
+ // Creates a half-open window for SUBSCRIBES.
+ SubscribeWindow() = default;
+ SubscribeWindow(FullSequence start) : start_(start) {}
+
+ // Creates a closed window for SUBSCRIBE or FETCH with no end object;
+ SubscribeWindow(FullSequence start, std::optional<uint64_t> end_group)
+ : start_(start),
+ end_(FullSequence(end_group.value_or(UINT64_MAX), UINT64_MAX)) {}
+ // For FETCH with end object
+ SubscribeWindow(FullSequence start, uint64_t end_group,
+ std::optional<uint64_t> end_object)
+ : start_(start),
+ end_(FullSequence(end_group, end_object.value_or(UINT64_MAX))) {}
+
+ bool InWindow(const FullSequence& seq) const {
+ return start_ <= seq && seq <= end_;
}
-
- // Creates a closed window.
- SubscribeWindow(uint64_t start_group, uint64_t start_object,
- uint64_t end_group, uint64_t end_object)
- : SubscribeWindow(FullSequence(start_group, start_object),
- FullSequence(end_group, end_object)) {}
-
- SubscribeWindow(FullSequence start, std::optional<FullSequence> end)
- : start_(start), end_(end) {}
-
- bool InWindow(const FullSequence& seq) const;
- const std::optional<FullSequence>& end() const { return end_; }
FullSequence start() const { return start_; }
+ FullSequence end() const { return end_; }
// Updates the subscription window. Returns true if the update is valid (in
// MoQT, subscription windows are only allowed to shrink, not to expand).
- bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end);
+ // Called only as a result of SUBSCRIBE_OK (largest_id) or SUBSCRIBE_UPDATE.
+ bool TruncateStart(FullSequence start);
+ // Called only as a result of SUBSCRIBE_UPDATE.
+ bool TruncateEnd(uint64_t end_group);
+ // Called only as a result of FETCH_OK (largest_id)
+ bool TruncateEnd(FullSequence largest_id);
private:
// The subgroups in these sequences have no meaning.
- FullSequence start_;
- std::optional<FullSequence> end_;
+ FullSequence start_ = FullSequence();
+ FullSequence end_ = FullSequence(UINT64_MAX, UINT64_MAX);
};
// ReducedSequenceIndex represents an index object such that if two sequence
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 7b6962b..d9b4160 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -22,14 +22,13 @@
const uint64_t subscribe_id_ = 2;
const FullSequence start_{4, 0};
- const FullSequence end_{5, 5};
+ const uint64_t end_ = 5;
};
TEST_F(SubscribeWindowTest, Queries) {
SubscribeWindow window(start_, end_);
EXPECT_TRUE(window.InWindow(FullSequence(4, 0)));
- EXPECT_TRUE(window.InWindow(FullSequence(5, 5)));
- EXPECT_FALSE(window.InWindow(FullSequence(5, 6)));
+ EXPECT_TRUE(window.InWindow(FullSequence(5, UINT64_MAX)));
EXPECT_FALSE(window.InWindow(FullSequence(6, 0)));
EXPECT_FALSE(window.InWindow(FullSequence(3, 12)));
}
@@ -49,20 +48,17 @@
TEST_F(SubscribeWindowTest, UpdateStartEnd) {
SubscribeWindow window(start_, end_);
- EXPECT_TRUE(window.UpdateStartEnd(start_.next(),
- FullSequence(end_.group, end_.object - 1)));
- EXPECT_FALSE(window.InWindow(FullSequence(start_.group, start_.object)));
- EXPECT_FALSE(window.InWindow(FullSequence(end_.group, end_.object)));
- EXPECT_FALSE(
- window.UpdateStartEnd(start_, FullSequence(end_.group, end_.object - 1)));
- EXPECT_FALSE(window.UpdateStartEnd(start_.next(), end_));
-}
-
-TEST_F(SubscribeWindowTest, UpdateStartEndOpenEnded) {
- SubscribeWindow window(start_, std::nullopt);
- EXPECT_TRUE(window.UpdateStartEnd(start_, end_));
- EXPECT_FALSE(window.InWindow(end_.next()));
- EXPECT_FALSE(window.UpdateStartEnd(start_, std::nullopt));
+ EXPECT_TRUE(window.TruncateStart(start_.next()));
+ EXPECT_TRUE(window.TruncateEnd(end_ - 1));
+ EXPECT_FALSE(window.InWindow(start_));
+ EXPECT_FALSE(window.InWindow(FullSequence(end_, 0)));
+ // Widens start_ again.
+ EXPECT_FALSE(window.TruncateStart(start_));
+ // Widens end_ again.
+ EXPECT_FALSE(window.TruncateEnd(end_));
+ EXPECT_TRUE(window.TruncateEnd(FullSequence(end_ - 1, 10)));
+ EXPECT_TRUE(window.InWindow(FullSequence(end_ - 1, 10)));
+ EXPECT_FALSE(window.InWindow(FullSequence(end_ - 1, 11)));
}
} // namespace test
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 4e131a4..516ac4a 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -125,6 +125,7 @@
auto task = std::make_unique<UpstreamFetchTask>(largest_id, status,
std::move(callback));
task_ = task->weak_ptr();
+ window().TruncateEnd(largest_id);
std::move(ok_callback_)(std::move(task));
if (can_read_callback_) {
task_.GetIfAvailable()->set_can_read_callback(
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 7656e3e..9905b18 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -69,12 +69,13 @@
return window_.InWindow(sequence);
}
- void ChangeWindow(SubscribeWindow& window) { window_ = window; }
-
quiche::QuicheWeakPtr<RemoteTrack> weak_ptr() {
return weak_ptr_factory_.Create();
}
+ protected:
+ SubscribeWindow& window() { return window_; };
+
private:
const FullTrackName full_track_name_;
const uint64_t subscribe_id_;
@@ -118,10 +119,8 @@
const MoqtSubscribe& subscribe, Visitor* visitor,
quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite())
: RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
- SubscribeWindow(subscribe.start_group.value_or(0),
- subscribe.start_object.value_or(0),
- subscribe.end_group.value_or(UINT64_MAX),
- UINT64_MAX)),
+ SubscribeWindow(subscribe.start.value_or(FullSequence()),
+ subscribe.end_group)),
track_alias_(subscribe.track_alias),
visitor_(visitor),
delivery_timeout_(delivery_timeout),
@@ -153,6 +152,14 @@
return (is_datagram_ == is_datagram);
}
}
+ // Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE.
+ bool TruncateStart(FullSequence start) {
+ return window().TruncateStart(start);
+ }
+ // Called on SUBSCRIBE_UPDATE.
+ bool TruncateEnd(uint64_t end_group) {
+ return window().TruncateEnd(end_group);
+ }
void OnStreamOpened();
void OnStreamClosed();
void OnSubscribeDone(uint64_t stream_count, const quic::QuicClock* clock,
@@ -215,14 +222,11 @@
class UpstreamFetch : public RemoteTrack {
public:
UpstreamFetch(const MoqtFetch& fetch, FetchResponseCallback callback)
- : RemoteTrack(
- fetch.full_track_name, fetch.fetch_id,
- fetch.joining_fetch.has_value()
- ? SubscribeWindow(0, 0)
- : SubscribeWindow(
- fetch.start_object,
- FullSequence(fetch.end_group,
- fetch.end_object.value_or(UINT64_MAX)))),
+ : RemoteTrack(fetch.full_track_name, fetch.fetch_id,
+ fetch.joining_fetch.has_value()
+ ? SubscribeWindow(FullSequence(0, 0))
+ : SubscribeWindow(fetch.start_object, fetch.end_group,
+ fetch.end_object)),
ok_callback_(std::move(callback)) {
// Immediately set the data stream type.
CheckDataStreamType(MoqtDataStreamType::kStreamHeaderFetch);
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 407f1e6..1e5c053 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -54,8 +54,7 @@
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/128,
/*group_order=*/std::nullopt,
- /*ranges=*/2,
- 0,
+ /*start=*/FullSequence(2, 0),
std::nullopt,
MoqtSubscribeParameters(),
};
@@ -86,9 +85,10 @@
TEST_F(SubscribeRemoteTrackTest, Windows) {
EXPECT_TRUE(track_.InWindow(FullSequence(2, 0)));
- SubscribeWindow new_window(2, 1);
- track_.ChangeWindow(new_window);
+ track_.TruncateStart(FullSequence(2, 1));
EXPECT_FALSE(track_.InWindow(FullSequence(2, 0)));
+ track_.TruncateEnd(2);
+ EXPECT_FALSE(track_.InWindow(FullSequence(3, 0)));
}
class UpstreamFetchTest : public quic::test::QuicTest {
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
index cfd5b47..a9db174 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
@@ -128,24 +128,26 @@
.WillByDefault([this](const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
MoqtSubscribeParameters) {
- return Subscribe(name, visitor, SubscribeWindow(0, 0));
+ return Subscribe(name, visitor, SubscribeWindow());
});
ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _))
.WillByDefault([this](const FullTrackName& name, uint64_t start_group,
uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
MoqtSubscribeParameters) {
- return Subscribe(name, visitor,
- SubscribeWindow(start_group, start_object));
+ return Subscribe(
+ name, visitor,
+ SubscribeWindow(FullSequence(start_group, start_object)));
});
ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _, _))
.WillByDefault([this](const FullTrackName& name, uint64_t start_group,
uint64_t start_object, uint64_t end_group,
SubscribeRemoteTrack::Visitor* visitor,
MoqtSubscribeParameters) {
- return Subscribe(name, visitor,
- SubscribeWindow(start_group, start_object, end_group,
- UINT64_MAX));
+ return Subscribe(
+ name, visitor,
+ SubscribeWindow(FullSequence(start_group, start_object),
+ end_group));
});
ON_CALL(*this, Unsubscribe)
.WillByDefault([this](const FullTrackName& name) {
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h
index d6eddb8..74b8195 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.h
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -78,12 +78,6 @@
MoqtSubscribeParameters parameters),
(override));
- [[deprecated]] bool SubscribeCurrentGroup(
- const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor,
- MoqtSubscribeParameters parameters) override {
- QUICHE_LOG(FATAL) << "Don't call this";
- }
-
private:
class LoopbackObjectListener;
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 4368ed4..afcde1e 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -103,8 +103,7 @@
subscribe.full_track_name = publisher->GetTrackName();
subscribe.track_alias = track_alias;
subscribe.subscribe_id = subscribe_id;
- subscribe.start_group = start_group;
- subscribe.start_object = start_object;
+ subscribe.start = FullSequence(start_group, start_object);
subscribe.subscriber_priority = 0x80;
session->published_subscriptions_.emplace(
subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index dd0a326..5bea109 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -507,12 +507,8 @@
QUIC_LOG(INFO) << "SUBSCRIBE group order mismatch";
return false;
}
- if (cast.start_group != subscribe_.start_group) {
- QUIC_LOG(INFO) << "SUBSCRIBE start group mismatch";
- return false;
- }
- if (cast.start_object != subscribe_.start_object) {
- QUIC_LOG(INFO) << "SUBSCRIBE start object mismatch";
+ if (cast.start != subscribe_.start) {
+ QUIC_LOG(INFO) << "SUBSCRIBE start mismatch";
return false;
}
if (cast.end_group != subscribe_.end_group) {
@@ -557,8 +553,7 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/MoqtDeliveryOrder::kDescending,
- /*start_group=*/4,
- /*start_object=*/1,
+ /*start=*/FullSequence(4, 1),
/*end_group=*/std::nullopt,
/*parameters=*/
MoqtSubscribeParameters{
@@ -778,11 +773,7 @@
QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscribe ID mismatch";
return false;
}
- if (cast.start_group != subscribe_update_.start_group) {
- QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE start group mismatch";
- return false;
- }
- if (cast.start_object != subscribe_update_.start_object) {
+ if (cast.start != subscribe_update_.start) {
QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE start group mismatch";
return false;
}
@@ -818,8 +809,7 @@
MoqtSubscribeUpdate subscribe_update_ = {
/*subscribe_id=*/2,
- /*start_group=*/3,
- /*start_object=*/1,
+ /*start=*/FullSequence(3, 1),
/*end_group=*/4,
/*subscriber_priority=*/0xaa,
/*parameters=*/
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index 4758091..1a4c558 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -69,7 +69,7 @@
"do not subscribe\n";
return std::nullopt;
}
- if (session_->SubscribeCurrentGroup(
+ if (session_->SubscribeCurrentObject(
*track_name, &remote_track_visitor_,
MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)),
std::nullopt, std::nullopt, std::nullopt})) {
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 8d7a3c6..cae1264 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -49,8 +49,8 @@
return std::nullopt;
}
std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n";
- session_->SubscribeCurrentGroup(*track_name_, server_->remote_track_visitor(),
- MoqtSubscribeParameters());
+ session_->SubscribeCurrentObject(
+ *track_name_, server_->remote_track_visitor(), MoqtSubscribeParameters());
server_->AddUser(*track_name_);
return std::nullopt;
}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
index e4d01cd..2bb5616 100644
--- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -154,8 +154,8 @@
for (absl::string_view track : tracks_to_subscribe) {
FullTrackName full_track_name = track_namespace;
full_track_name.AddElement(track);
- session_->SubscribeCurrentGroup(full_track_name, &it->second,
- MoqtSubscribeParameters());
+ session_->JoiningFetch(full_track_name, &it->second, 0,
+ MoqtSubscribeParameters());
}
return std::nullopt;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index 988ddee..257c4a5 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -461,8 +461,8 @@
if (!parameters_.delivery_timeout.IsInfinite()) {
subscription_parameters.delivery_timeout = parameters_.delivery_timeout;
}
- server_session()->SubscribeCurrentGroup(TrackName(), &receiver_,
- subscription_parameters);
+ server_session()->JoiningFetch(TrackName(), &receiver_, 0,
+ subscription_parameters);
simulator_.RunFor(parameters_.duration);
// At the end, we wait for eight RTTs until the connection settles down.