Make request_id even/odd for client/server.
Implement new MoQT SUBSCRIBE format.
PiperOrigin-RevId: 756496625
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index d2f57fc..21588c9 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -193,6 +193,8 @@
return WireUint8(0xff);
}
+WireUint8 WireBoolean(bool value) { return WireUint8(value ? 0x01 : 0x00); }
+
uint64_t SignedVarintSerializedForm(int64_t value) {
if (value < 0) {
return ((-value) << 1) | 0x01;
@@ -416,11 +418,6 @@
quiche::QuicheBuffer MoqtFramer::SerializeSubscribe(
const MoqtSubscribe& message) {
- MoqtFilterType filter_type = GetFilterType(message);
- if (filter_type == MoqtFilterType::kNone) {
- QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range";
- return quiche::QuicheBuffer();
- }
KeyValuePairList parameters;
VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
if (!ValidateVersionSpecificParameters(parameters,
@@ -429,33 +426,44 @@
<< "Serializing invalid MoQT parameters";
return quiche::QuicheBuffer();
}
- switch (filter_type) {
+ switch (message.filter_type) {
+ case MoqtFilterType::kNextGroupStart:
case MoqtFilterType::kLatestObject:
return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+ MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
WireVarInt62(message.track_alias),
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireKeyValuePairList(parameters));
+ WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+ WireVarInt62(message.filter_type), WireKeyValuePairList(parameters));
case MoqtFilterType::kAbsoluteStart:
+ if (!message.start.has_value()) {
+ return quiche::QuicheBuffer();
+ };
return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+ MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
WireVarInt62(message.track_alias),
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireVarInt62(message.start->group),
+ WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+ WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
WireVarInt62(message.start->object),
WireKeyValuePairList(parameters));
case MoqtFilterType::kAbsoluteRange:
+ if (!message.start.has_value() || !message.end_group.has_value()) {
+ return quiche::QuicheBuffer();
+ }
+ if (*message.end_group < message.start->group) {
+ QUICHE_BUG(MoqtFramer_invalid_end_group) << "Invalid object range";
+ return quiche::QuicheBuffer();
+ }
return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+ MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
WireVarInt62(message.track_alias),
WireFullTrackName(message.full_track_name, true),
WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
- WireVarInt62(message.start->group),
+ WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+ WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
WireVarInt62(message.start->object), WireVarInt62(*message.end_group),
WireKeyValuePairList(parameters));
default:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index e772bc8..16939e3 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -402,35 +402,24 @@
}
TEST_F(MoqtFramerSimpleTest, AllSubscribeInputs) {
- for (std::optional<Location> start :
- {std::optional<Location>(),
- std::optional<Location>(std::in_place, 4, 0)}) {
- for (std::optional<uint64_t> end_group :
- {std::optional<uint64_t>(), std::optional<uint64_t>(7)}) {
- MoqtSubscribe subscribe = {
- /*subscribe_id=*/3,
- /*track_alias=*/4,
- /*full_track_name=*/FullTrackName({"foo", "abcd"}),
- /*subscriber_priority=*/0x20,
- /*group_order=*/std::nullopt,
- start,
- end_group,
- VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
- };
- quiche::QuicheBuffer buffer;
- MoqtFilterType expected_filter_type = GetFilterType(subscribe);
- if (expected_filter_type == MoqtFilterType::kNone) {
- EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
- "Invalid object range");
- EXPECT_EQ(buffer.size(), 0);
- continue;
- }
- buffer = framer_.SerializeSubscribe(subscribe);
- // Go to the filter type.
- const uint8_t* read = BufferAtOffset(buffer, 17);
- EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type);
- EXPECT_GT(buffer.size(), 0);
- }
+ for (auto filter :
+ {MoqtFilterType::kNextGroupStart, MoqtFilterType::kLatestObject,
+ MoqtFilterType::kAbsoluteStart, MoqtFilterType::kAbsoluteRange}) {
+ MoqtSubscribe subscribe = {
+ /*subscribe_id=*/3,
+ /*track_alias=*/4,
+ /*full_track_name=*/FullTrackName({"foo", "abcd"}),
+ /*subscriber_priority=*/0x20,
+ /*group_order=*/std::nullopt,
+ /*forward=*/true,
+ /*filter_type=*/filter,
+ /*start=*/std::make_optional<Location>(4, 1),
+ /*end_group=*/std::make_optional<uint64_t>(6ULL),
+ VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
+ };
+ quiche::QuicheBuffer buffer;
+ buffer = framer_.SerializeSubscribe(subscribe);
+ EXPECT_GT(buffer.size(), 0);
}
}
@@ -441,13 +430,15 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/std::nullopt,
- /*start=*/Location(4, 3),
- /*end_group=*/3,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteRange,
+ /*start=*/std::make_optional<Location>(4, 3),
+ /*end_group=*/std::make_optional<uint64_t>(3ULL),
VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
};
quiche::QuicheBuffer buffer;
- EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
- "Invalid object range");
+ EXPECT_QUICHE_BUG(buffer = framer_.SerializeSubscribe(subscribe),
+ "Invalid object range");
EXPECT_EQ(buffer.size(), 0);
}
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 904954c..7e6f480 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -111,22 +111,6 @@
}
}
-MoqtFilterType GetFilterType(const MoqtSubscribe& message) {
- if (message.start.has_value()) {
- if (message.end_group.has_value()) {
- if (*message.end_group < message.start->group) {
- return MoqtFilterType::kNone;
- }
- return MoqtFilterType::kAbsoluteRange;
- }
- return MoqtFilterType::kAbsoluteStart;
- }
- if (message.end_group.has_value()) {
- return MoqtFilterType::kNone; // End group without start is invalid.
- }
- return MoqtFilterType::kLatestObject;
-}
-
MoqtError ValidateSetupParameters(const KeyValuePairList& parameters,
bool webtrans,
quic::Perspective perspective) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index ac22147..9ba07f8 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -518,33 +518,25 @@
enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
kNone = 0x0,
+ kNextGroupStart = 0x1,
kLatestObject = 0x2,
kAbsoluteStart = 0x3,
kAbsoluteRange = 0x4,
};
struct QUICHE_EXPORT MoqtSubscribe {
- uint64_t subscribe_id;
+ uint64_t request_id;
uint64_t track_alias;
FullTrackName full_track_name;
MoqtPriority subscriber_priority;
std::optional<MoqtDeliveryOrder> group_order;
-
- // The combinations of these that have values indicate the filter type.
- // (none): KLatestObject
- // start: kAbsoluteStart
- // start, end_group: kAbsoluteRange (request whole last group)
- // All other combinations are invalid.
+ bool forward;
+ MoqtFilterType filter_type;
std::optional<Location> start;
std::optional<uint64_t> end_group;
- // If the mode is kNone, the these are std::nullopt.
VersionSpecificParameters parameters;
};
-// Deduce the filter type from the combination of group and object IDs. Returns
-// kNone if the state of the subscribe is invalid.
-MoqtFilterType GetFilterType(const MoqtSubscribe& message);
-
struct QUICHE_EXPORT MoqtSubscribeOk {
uint64_t subscribe_id;
// The message uses ms, but expires is in us.
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 5dd2508..e9a093b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -402,23 +402,30 @@
size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) {
MoqtSubscribe subscribe;
uint64_t filter, group, object;
- uint8_t group_order;
+ uint8_t group_order, forward;
absl::string_view track_name;
- if (!reader.ReadVarInt62(&subscribe.subscribe_id) ||
+ if (!reader.ReadVarInt62(&subscribe.request_id) ||
!reader.ReadVarInt62(&subscribe.track_alias) ||
!ReadTrackNamespace(reader, subscribe.full_track_name) ||
!reader.ReadStringPieceVarInt62(&track_name) ||
!reader.ReadUInt8(&subscribe.subscriber_priority) ||
- !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) {
+ !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) ||
+ !reader.ReadVarInt62(&filter)) {
return 0;
}
subscribe.full_track_name.AddElement(track_name);
if (!ParseDeliveryOrder(group_order, subscribe.group_order)) {
- ParseError("Invalid group order value in SUBSCRIBE message");
+ ParseError("Invalid group order value in SUBSCRIBE");
return 0;
}
- MoqtFilterType filter_type = static_cast<MoqtFilterType>(filter);
- switch (filter_type) {
+ if (forward > 1) {
+ ParseError("Invalid forward value in SUBSCRIBE");
+ return 0;
+ }
+ subscribe.forward = (forward == 1);
+ subscribe.filter_type = static_cast<MoqtFilterType>(filter);
+ switch (subscribe.filter_type) {
+ case MoqtFilterType::kNextGroupStart:
case MoqtFilterType::kLatestObject:
break;
case MoqtFilterType::kAbsoluteStart:
@@ -427,7 +434,7 @@
return 0;
}
subscribe.start = Location(group, object);
- if (filter_type == MoqtFilterType::kAbsoluteStart) {
+ if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) {
break;
}
if (!reader.ReadVarInt62(&group)) {
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 7e4ba28..4779c89 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -648,8 +648,8 @@
0x20, 0x00, 0x0d, 0x02, 0x01, 0x02, // versions = 1, 2
0x03, // 4 params
0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo"
- 0x02, 0x32, // max_subscribe_id = 50
- 0x02, 0x32, // max_subscribe_id = 50
+ 0x02, 0x32, // max_request_id = 50
+ 0x02, 0x32, // max_request_id = 50
};
stream.Receive(absl::string_view(setup, sizeof(setup)), false);
parser.ReadAndDispatchMessages();
@@ -663,10 +663,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1a, 0x01, 0x02,
+ 0x03, 0x00, 0x1b, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x02, // two params
0x1f, 0x03, 0x62, 0x61, 0x72, // 0x1f = "bar"
@@ -681,10 +681,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x16, 0x01, 0x02,
+ 0x03, 0x00, 0x17, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x02, // two params
0x02, 0x67, 0x10, // delivery_timeout = 10000
@@ -701,10 +701,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x16, 0x01, 0x02,
+ 0x03, 0x00, 0x17, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x02, // two params
0x04, 0x67, 0x10, // max_cache_duration = 10000
@@ -721,10 +721,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x14, 0x01, 0x02,
+ 0x03, 0x00, 0x15, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x01, // one param
0x01, 0x02, 0x00, 0x00, // authorization_token = DELETE 0;
@@ -740,10 +740,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x18, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f,
- 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x19, 0x01, 0x02, 0x01, 0x03, 0x66,
+ 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x01, // one param
0x01, 0x06, 0x01, 0x10, 0x00, 0x62, 0x61, 0x72, // REGISTER 0x01
@@ -759,10 +759,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x14, 0x01, 0x02,
+ 0x03, 0x00, 0x15, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x01, // one param
0x01, 0x02, 0x02, 0x07, // authorization_token = USE 7;
@@ -779,10 +779,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x14, 0x01, 0x02,
+ 0x03, 0x00, 0x15, 0x01, 0x02,
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x01, // one param
0x01, 0x02, 0x04, 0x07, // authorization_token type 4
@@ -799,10 +799,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x16, 0x01, 0x02, 0x01,
+ 0x03, 0x00, 0x17, 0x01, 0x02, 0x01,
0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
0x01, // one param
0x01, 0x04, 0x03, 0x01, 0x00, 0x00 // authorization_token type 1
@@ -814,6 +814,81 @@
EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError);
}
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidGroupOrder) {
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
+ char subscribe[] = {
+ 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x03, // group order = invalid
+ 0x01, // forward = true
+ 0x03, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
+ // No EndGroup or EndObject
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
+ 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ };
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE");
+ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidForward) {
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
+ char subscribe[] = {
+ 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x02, // group order = descending
+ 0x02, // forward = invalid
+ 0x03, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
+ // No EndGroup or EndObject
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
+ 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ };
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in SUBSCRIBE");
+ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidFilter) {
+ webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+ MoqtControlParser parser(kRawQuic, &stream, visitor_);
+ char subscribe[] = {
+ 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x02, // group order = descending
+ 0x01, // forward = true
+ 0x05, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
+ // No EndGroup or EndObject
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
+ 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ };
+ stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+ parser.ReadAndDispatchMessages();
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_EQ(visitor_.parsing_error_, "Invalid filter type");
+ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationToken) {
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
@@ -978,13 +1053,13 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x15, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20, group order descending
+ 0x20, 0x02, 0x01, // priority = 0x20, group order, forward
0x02, // filter_type = kLatestObject
0x01, // 1 parameter
- 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -1000,28 +1075,28 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x15, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x08, // priority = 0x20 ???
- 0x01, // filter_type = kLatestGroup
+ 0x20, 0x08, 0x01, // priority, invalid order, forward
+ 0x01, // filter_type = kNextGroupStart
0x01, // 1 parameter
- 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar"
+ 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
EXPECT_EQ(visitor_.messages_received_, 0);
- EXPECT_THAT(visitor_.parsing_error_, Optional(HasSubstr("group order")));
+ EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE");
}
TEST_F(MoqtMessageSpecificTest, AbsoluteStart) {
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x17, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x03, // filter_type = kAbsoluteStart
0x04, // start_group = 4
0x01, // start_object = 1
@@ -1043,10 +1118,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x19, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x04, // filter_type = kAbsoluteRange
0x04, // start_group = 4
0x01, // start_object = 1
@@ -1069,10 +1144,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x19, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x04, // filter_type = kAbsoluteRange
0x04, // start_group = 4
0x01, // start_object = 1
@@ -1090,10 +1165,10 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x13, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x14, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, 0x02, // priority = 0x20 descending
+ 0x20, 0x02, 0x01, // priority, order, forward
0x04, // filter_type = kAbsoluteRange
0x04, // start_group = 4
0x01, // start_object = 1
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2d7ab24..5645885 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -71,9 +71,17 @@
return status.ok() && DoesTrackStatusImplyHavingData(*status);
}
-SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) {
- return SubscribeWindow(subscribe.start.value_or(Location(0, 0)),
- subscribe.end_group);
+std::optional<SubscribeWindow> SubscribeMessageToWindow(
+ const MoqtSubscribe& subscribe) {
+ if (!subscribe.forward ||
+ subscribe.filter_type == MoqtFilterType::kLatestObject ||
+ subscribe.filter_type == MoqtFilterType::kNextGroupStart) {
+ return std::nullopt;
+ }
+ if (!subscribe.start.has_value()) {
+ return std::nullopt;
+ }
+ return SubscribeWindow(*subscribe.start, subscribe.end_group);
}
class DefaultPublisher : public MoqtPublisher {
@@ -111,6 +119,11 @@
}
});
}
+ if (parameters_.perspective == Perspective::IS_SERVER) {
+ next_request_id_ = 1;
+ } else {
+ next_incoming_request_id_ = 1;
+ }
}
MoqtSession::ControlStream* MoqtSession::GetControlStream() {
@@ -327,6 +340,8 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
+ message.forward = true;
+ message.filter_type = MoqtFilterType::kAbsoluteStart;
message.start = Location(start_group, start_object);
message.end_group = std::nullopt;
message.parameters = parameters;
@@ -346,6 +361,8 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
+ message.forward = true;
+ message.filter_type = MoqtFilterType::kAbsoluteRange;
message.start = Location(start_group, start_object);
message.end_group = end_group;
message.parameters = parameters;
@@ -359,6 +376,8 @@
message.full_track_name = name;
message.subscriber_priority = kDefaultSubscriberPriority;
message.group_order = std::nullopt;
+ message.forward = true;
+ message.filter_type = MoqtFilterType::kLatestObject;
message.start = std::nullopt;
message.end_group = std::nullopt;
message.parameters = parameters;
@@ -396,7 +415,8 @@
}
MoqtFetch message;
message.full_track_name = name;
- message.fetch_id = next_request_id_++;
+ message.fetch_id = next_request_id_;
+ next_request_id_ += 2;
message.start_object = start;
message.end_group = end_group;
message.end_object = end_object;
@@ -439,9 +459,9 @@
MoqtPriority priority,
std::optional<MoqtDeliveryOrder> delivery_order,
VersionSpecificParameters parameters) {
- if ((next_request_id_ + 1) >= peer_max_request_id_) {
+ if ((next_request_id_ + 2) >= peer_max_request_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID "
- << (next_request_id_ + 1)
+ << (next_request_id_ + 2)
<< " which is greater than the maximum ID "
<< peer_max_request_id_;
return false;
@@ -450,7 +470,8 @@
subscribe.full_track_name = name;
subscribe.subscriber_priority = priority;
subscribe.group_order = delivery_order;
- // Must be "Current Object" filter.
+ subscribe.forward = true;
+ subscribe.filter_type = MoqtFilterType::kLatestObject;
subscribe.start = std::nullopt;
subscribe.end_group = std::nullopt;
subscribe.parameters = parameters;
@@ -458,10 +479,11 @@
return false;
}
MoqtFetch fetch;
- fetch.fetch_id = next_request_id_++;
+ fetch.fetch_id = next_request_id_;
+ next_request_id_ += 2;
fetch.subscriber_priority = priority;
fetch.group_order = delivery_order;
- fetch.joining_fetch = {subscribe.subscribe_id, num_previous_groups};
+ fetch.joining_fetch = {subscribe.request_id, num_previous_groups};
fetch.parameters = parameters;
SendControlMessage(framer_.SerializeFetch(fetch));
QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name;
@@ -614,7 +636,8 @@
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE after GOAWAY";
return false;
}
- message.subscribe_id = next_request_id_++;
+ message.request_id = next_request_id_;
+ next_request_id_ += 2;
if (provided_track_alias.has_value()) {
message.track_alias = *provided_track_alias;
next_remote_track_alias_ =
@@ -626,7 +649,7 @@
// Since we do not expose subscribe IDs directly in the API, instead wrap
// the session and subscribe ID in a callback.
visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this,
- message.subscribe_id));
+ message.request_id));
} else {
QUICHE_DLOG_IF(WARNING, message.parameters.oack_window_size.has_value())
<< "Attempting to set object_ack_window on a connection that does not "
@@ -639,7 +662,7 @@
auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
subscribe_by_name_.emplace(message.full_track_name, track.get());
subscribe_by_alias_.emplace(message.track_alias, track.get());
- upstream_by_id_.emplace(message.subscribe_id, std::move(track));
+ upstream_by_id_.emplace(message.request_id, std::move(track));
return true;
}
@@ -776,7 +799,7 @@
}
void MoqtSession::GrantMoreRequests(uint64_t num_requests) {
- local_max_request_id_ += num_requests;
+ local_max_request_id_ += (num_requests * 2);
MoqtMaxRequestId message;
message.max_request_id = local_max_request_id_;
SendControlMessage(framer_.SerializeMaxRequestId(message));
@@ -788,13 +811,13 @@
Error(MoqtError::kTooManyRequests, "Received request with too large ID");
return false;
}
- if (request_id < next_incoming_request_id_) {
+ if (request_id != next_incoming_request_id_) {
QUIC_DLOG(INFO) << ENDPOINT << "Request ID not monotonically increasing";
Error(MoqtError::kInvalidRequestId,
"Request ID not monotonically increasing");
return false;
}
- next_incoming_request_id_ = request_id + 1;
+ next_incoming_request_id_ = request_id + 2;
return true;
}
@@ -902,14 +925,14 @@
void MoqtSession::ControlStream::OnSubscribeMessage(
const MoqtSubscribe& message) {
- if (!session_->ValidateRequestId(message.subscribe_id)) {
+ if (!session_->ValidateRequestId(message.request_id)) {
return;
}
QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
<< message.full_track_name;
if (session_->sent_goaway_) {
QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY";
- SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kUnauthorized,
+ SendSubscribeError(message.request_id, SubscribeErrorCode::kUnauthorized,
"SUBSCRIBE after GOAWAY", message.track_alias);
return;
}
@@ -925,7 +948,7 @@
QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
<< " rejected by the application: "
<< track_publisher.status();
- SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kDoesNotExist,
+ SendSubscribeError(message.request_id, SubscribeErrorCode::kDoesNotExist,
track_publisher.status().message(), message.track_alias);
return;
}
@@ -945,7 +968,7 @@
subscription->set_delivery_timeout(message.parameters.delivery_timeout);
MoqtSession::PublishedSubscription* subscription_ptr = subscription.get();
auto [it, success] = session_->published_subscriptions_.emplace(
- message.subscribe_id, std::move(subscription));
+ message.request_id, std::move(subscription));
if (!success) {
QUICHE_NOTREACHED(); // ValidateRequestId() should have caught this.
}
@@ -1692,11 +1715,12 @@
MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher,
const MoqtSubscribe& subscribe,
MoqtPublishingMonitorInterface* monitoring_interface)
- : filter_type_(GetFilterType(subscribe)),
- subscription_id_(subscribe.subscribe_id),
- session_(session),
+ : session_(session),
track_publisher_(track_publisher),
+ request_id_(subscribe.request_id),
track_alias_(subscribe.track_alias),
+ filter_type_(subscribe.filter_type),
+ forward_(subscribe.forward),
window_(SubscribeMessageToWindow(subscribe)),
subscriber_priority_(subscribe.subscriber_priority),
subscriber_delivery_order_(subscribe.group_order),
@@ -1731,12 +1755,18 @@
void MoqtSession::PublishedSubscription::Update(
Location start, std::optional<uint64_t> end_group,
MoqtPriority subscriber_priority) {
- window_.TruncateStart(start);
- if (end_group.has_value()) {
- window_.TruncateEnd(*end_group);
- }
subscriber_priority_ = subscriber_priority;
+ if (!window_.has_value()) {
+ window_ = SubscribeWindow(start, end_group);
+ return;
+ }
+ window_->TruncateStart(start);
+ if (end_group.has_value()) {
+ window_->TruncateEnd(*end_group);
+ }
// TODO: update priority of all data streams that are currently open.
+ // TODO: update delivery timeout.
+ // TODO: update forward and subscribe filter.
// TODO: reset streams that are no longer in-window.
// TODO: send SUBSCRIBE_DONE if required.
@@ -1755,7 +1785,7 @@
webtransport::SendOrder old_send_order =
FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
subscriber_priority_ = priority;
- session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+ session_->UpdateQueuedSendOrder(request_id_, old_send_order,
FinalizeSendOrder(old_send_order));
};
@@ -1764,21 +1794,26 @@
ControlStream* stream = session_->GetControlStream();
if (PublisherHasData(*track_publisher_)) {
largest_id = track_publisher_->GetLargestSequence();
- if (window_.end() < *largest_id) {
- stream->SendSubscribeError(subscription_id_,
- SubscribeErrorCode::kInvalidRange,
- "SUBSCRIBE ends in past group", track_alias_);
- session_->published_subscriptions_.erase(subscription_id_);
- // No class access below this line!
- return;
+ QUICHE_CHECK(largest_id.has_value());
+ if (forward_) {
+ switch (filter_type_) {
+ case MoqtFilterType::kLatestObject:
+ window_ = SubscribeWindow(largest_id->next());
+ break;
+ case MoqtFilterType::kNextGroupStart:
+ window_ = SubscribeWindow(Location(largest_id->group + 1, 0));
+ break;
+ default:
+ break;
+ }
}
- if (!window_.TruncateStart(largest_id->next())) {
- QUICHE_NOTREACHED();
- };
+ } else if (filter_type_ == MoqtFilterType::kLatestObject ||
+ filter_type_ == MoqtFilterType::kNextGroupStart) {
+ // No data yet. All objects will be in-window.
+ window_ = SubscribeWindow(Location(0, 0));
}
-
MoqtSubscribeOk subscribe_ok;
- subscribe_ok.subscribe_id = subscription_id_;
+ subscribe_ok.subscribe_id = request_id_;
subscribe_ok.group_order = track_publisher_->GetDeliveryOrder();
subscribe_ok.largest_id = largest_id;
// TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the
@@ -1790,15 +1825,15 @@
void MoqtSession::PublishedSubscription::OnSubscribeRejected(
MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias) {
session_->GetControlStream()->SendSubscribeError(
- subscription_id_, reason.error_code, reason.reason_phrase,
+ request_id_, reason.error_code, reason.reason_phrase,
track_alias.value_or(track_alias_));
- session_->published_subscriptions_.erase(subscription_id_);
+ session_->published_subscriptions_.erase(request_id_);
// No class access below this line!
}
void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
Location sequence) {
- if (!window_.InWindow(sequence)) {
+ if (!InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
@@ -1841,7 +1876,7 @@
if (stream_id.has_value()) {
raw_stream = session_->session_->GetStreamById(*stream_id);
} else {
- raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
+ raw_stream = session_->OpenOrQueueDataStream(request_id_, sequence);
}
if (raw_stream == nullptr) {
return;
@@ -1853,12 +1888,12 @@
}
void MoqtSession::PublishedSubscription::OnTrackPublisherGone() {
- session_->SubscribeIsDone(subscription_id_, SubscribeDoneCode::kGoingAway,
+ session_->SubscribeIsDone(request_id_, SubscribeDoneCode::kGoingAway,
"Publisher is gone");
}
void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) {
- if (!window_.InWindow(sequence)) {
+ if (!InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
@@ -1884,7 +1919,7 @@
void MoqtSession::PublishedSubscription::OnSubgroupAbandoned(
Location sequence, webtransport::StreamErrorCode error_code) {
- if (!window_.InWindow(sequence)) {
+ if (!InWindow(sequence)) {
return;
}
if (reset_subgroups_.contains(
@@ -1907,6 +1942,11 @@
}
void MoqtSession::PublishedSubscription::OnGroupAbandoned(uint64_t group_id) {
+ if (!window_.has_value() || window_->end().group < group_id ||
+ window_->start().group > group_id) {
+ // The group is not in the window, ignore.
+ return;
+ }
std::vector<webtransport::StreamId> streams =
stream_map().GetStreamsForGroup(group_id);
for (webtransport::StreamId stream_id : streams) {
@@ -1961,10 +2001,10 @@
queued_outgoing_data_streams_.emplace(
UpdateSendOrderForSubscriberPriority(send_order, 0), first_object);
if (!start_send_order.has_value()) {
- session_->UpdateQueuedSendOrder(subscription_id_, std::nullopt, send_order);
+ session_->UpdateQueuedSendOrder(request_id_, std::nullopt, send_order);
} else if (*start_send_order < send_order) {
session_->UpdateQueuedSendOrder(
- subscription_id_, FinalizeSendOrder(*start_send_order), send_order);
+ request_id_, FinalizeSendOrder(*start_send_order), send_order);
}
}
@@ -1980,13 +2020,12 @@
// then taking base().
queued_outgoing_data_streams_.erase((++it).base());
if (queued_outgoing_data_streams_.empty()) {
- session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
- std::nullopt);
+ session_->UpdateQueuedSendOrder(request_id_, old_send_order, std::nullopt);
} else {
webtransport::SendOrder new_send_order =
FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
if (old_send_order != new_send_order) {
- session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+ session_->UpdateQueuedSendOrder(request_id_, old_send_order,
new_send_order);
}
}
@@ -2017,7 +2056,7 @@
PublishedSubscription& subscription, Location first_object)
: session_(session),
stream_(stream),
- subscription_id_(subscription.subscription_id()),
+ subscription_id_(subscription.request_id()),
next_object_(first_object),
session_liveness_(session->liveness_token_) {
UpdateSendOrder(subscription);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index eceaa98..bc4e80d 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -113,6 +113,7 @@
// the track, the message will still be sent. However, the visitor will be
// ignored.
// Subscribe from (start_group, start_object) to the end of the track.
+ // TODO(martinduke): Allow setting forward = false.
bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
uint64_t start_object,
SubscribeRemoteTrack::Visitor* visitor,
@@ -125,6 +126,7 @@
bool SubscribeCurrentObject(const FullTrackName& name,
SubscribeRemoteTrack::Visitor* visitor,
VersionSpecificParameters parameters) override;
+ // TODO(martinduke): SubscribeNextGroup
// Returns false if the subscription is not found. The session immediately
// destroys all subscription state.
void Unsubscribe(const FullTrackName& name);
@@ -337,7 +339,7 @@
PublishedSubscription& operator=(const PublishedSubscription&) = delete;
PublishedSubscription& operator=(PublishedSubscription&&) = delete;
- uint64_t subscription_id() const { return subscription_id_; }
+ uint64_t request_id() const { return request_id_; }
MoqtTrackPublisher& publisher() { return *track_publisher_; }
uint64_t track_alias() const { return track_alias_; }
std::optional<Location> largest_sent() const { return largest_sent_; }
@@ -372,8 +374,13 @@
MoqtPriority subscriber_priority);
// Checks if the specified sequence is within the window of this
// subscription.
- bool InWindow(Location sequence) { return window_.InWindow(sequence); }
- Location GetWindowStart() const { return window_.start(); }
+ bool InWindow(Location sequence) {
+ return forward_ && window_.has_value() && window_->InWindow(sequence);
+ }
+ Location GetWindowStart() const {
+ QUICHE_CHECK(window_.has_value());
+ return window_->start();
+ }
MoqtFilterType filter_type() const { return filter_type_; };
void OnDataStreamCreated(webtransport::StreamId id,
@@ -427,12 +434,16 @@
subscriber_priority_);
}
- MoqtFilterType filter_type_;
- uint64_t subscription_id_;
MoqtSession* session_;
std::shared_ptr<MoqtTrackPublisher> track_publisher_;
+ uint64_t request_id_;
uint64_t track_alias_;
- SubscribeWindow window_;
+ MoqtFilterType filter_type_;
+ bool forward_;
+ // If window_ is nullopt, any arriving objects are ignored. This could be
+ // because forward=0, or because the subscription is waiting for a
+ // SUBSCRIBE_OK and doesn't know what the window should be yet.
+ std::optional<SubscribeWindow> window_;
MoqtPriority subscriber_priority_;
uint64_t streams_opened_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 2c8797c..1487e25 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -60,11 +60,13 @@
MoqtSubscribe DefaultSubscribe() {
MoqtSubscribe subscribe = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
/*track_alias=*/2,
kDefaultTrackName(),
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteStart,
/*start=*/Location(0, 0),
/*end_group=*/std::nullopt,
/*parameters=*/VersionSpecificParameters(),
@@ -74,7 +76,7 @@
MoqtFetch DefaultFetch() {
MoqtFetch fetch = {
- /*fetch_id=*/2,
+ /*fetch_id=*/1,
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
/*joining_fetch=*/std::nullopt,
@@ -155,7 +157,7 @@
return nullptr;
}
MoqtSubscribeOk expected_ok = {
- /*subscribe_id=*/subscribe.subscribe_id,
+ /*request_id=*/subscribe.request_id,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
/*group_order=*/MoqtDeliveryOrder::kAscending,
(*track_status == MoqtTrackStatusCode::kInProgress)
@@ -346,7 +348,7 @@
// Add the track. Now Subscribe should succeed.
MockTrackPublisher* track = CreateTrackPublisher();
std::make_shared<MockTrackPublisher>(request.full_track_name);
- ++request.subscribe_id;
+ request.request_id += 2;
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -504,28 +506,56 @@
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
-TEST_F(MoqtSessionTest, SubscribeEntirelyInPast) {
+TEST_F(MoqtSessionTest, SubscribeDoNotForward) {
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockTrackPublisher* track = CreateTrackPublisher();
- SetLargestId(track, Location(10, 20));
-
MoqtSubscribe request = DefaultSubscribe();
- request.end_group = 9;
- EXPECT_CALL(*track, AddObjectListener)
- .WillOnce([&](MoqtObjectListener* listener) {
- listener->OnSubscribeAccepted();
- });
- MoqtSubscribeError expected_error = {
- /*subscribe_id=*/request.subscribe_id,
- /*error_code=*/SubscribeErrorCode::kInvalidRange,
- /*reason_phrase=*/"SUBSCRIBE ends in past group",
- /*track_alias=*/request.track_alias,
- };
- EXPECT_CALL(mock_stream_,
- Writev(SerializedControlMessage(expected_error), _));
- stream_input->OnSubscribeMessage(request);
- EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+ request.forward = false;
+ request.filter_type = MoqtFilterType::kLatestObject;
+ EXPECT_CALL(*track, GetTrackStatus)
+ .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+ MoqtObjectListener* listener =
+ ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+ // forward=false, so incoming objects are ignored.
+ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+ .Times(0);
+ listener->OnNewObjectAvailable(Location(0, 0));
+}
+
+TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) {
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+ MockTrackPublisher* track = CreateTrackPublisher();
+ MoqtSubscribe request = DefaultSubscribe();
+ request.start = Location(1, 0);
+ EXPECT_CALL(*track, GetTrackStatus)
+ .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+ MoqtObjectListener* listener =
+ ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+ // Window was not set to (0, 0) by SUBSCRIBE acceptance.
+ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+ .Times(0);
+ listener->OnNewObjectAvailable(Location(0, 0));
+}
+
+TEST_F(MoqtSessionTest, SubscribeNextGroup) {
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+ MockTrackPublisher* track = CreateTrackPublisher();
+ MoqtSubscribe request = DefaultSubscribe();
+ request.filter_type = MoqtFilterType::kNextGroupStart;
+ SetLargestId(track, Location(10, 20));
+ MoqtObjectListener* listener =
+ ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+ // Later objects in group 10 ignored.
+ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+ .Times(0);
+ listener->OnNewObjectAvailable(Location(10, 21));
+ // Group 11 is sent.
+ EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+ .WillOnce(Return(false));
+ listener->OnNewObjectAvailable(Location(11, 0));
}
TEST_F(MoqtSessionTest, TwoSubscribesForTrack) {
@@ -535,7 +565,7 @@
MoqtSubscribe request = DefaultSubscribe();
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
- request.subscribe_id = 2;
+ request.request_id = 3;
request.start = Location(12, 0);
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -553,13 +583,13 @@
// Peer unsubscribes.
MoqtUnsubscribe unsubscribe = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
};
stream_input->OnUnsubscribeMessage(unsubscribe);
EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
// Subscribe again, succeeds.
- request.subscribe_id = 2;
+ request.request_id = 3;
request.start = Location(12, 0);
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -567,7 +597,7 @@
TEST_F(MoqtSessionTest, RequestIdTooHigh) {
// Peer subscribes to (0, 0)
MoqtSubscribe request = DefaultSubscribe();
- request.subscribe_id = kDefaultInitialMaxRequestId + 1;
+ request.request_id = kDefaultInitialMaxRequestId + 1;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
@@ -577,6 +607,10 @@
stream_input->OnSubscribeMessage(request);
}
+TEST_F(MoqtSessionTest, RequestIdWrongLsb) {
+ // TODO(martinduke): Implement this test.
+}
+
TEST_F(MoqtSessionTest, SubscribeIdNotIncreasing) {
MoqtSubscribe request = DefaultSubscribe();
std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -650,7 +684,7 @@
VersionSpecificParameters());
MoqtSubscribeOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
@@ -709,7 +743,9 @@
session_.GrantMoreRequests(1);
// Peer subscribes to (0, 0)
MoqtSubscribe request = DefaultSubscribe();
- request.subscribe_id = kDefaultInitialMaxRequestId;
+ MoqtSessionPeer::set_next_incoming_request_id(
+ &session_, kDefaultInitialMaxRequestId + 1);
+ request.request_id = kDefaultInitialMaxRequestId + 1;
MockTrackPublisher* track = CreateTrackPublisher();
ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
}
@@ -726,7 +762,7 @@
VersionSpecificParameters());
MoqtSubscribeError error = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*error_code=*/SubscribeErrorCode::kInvalidRange,
/*reason_phrase=*/"deadbeef",
/*track_alias=*/2,
@@ -1011,7 +1047,7 @@
// SUBSCRIBE_OK arrives
MoqtSubscribeOk ok = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
@@ -1056,7 +1092,7 @@
// SUBSCRIBE_ERROR arrives
MoqtSubscribeError subscribe_error = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
/*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
/*reason_phrase=*/"foo",
/*track_alias =*/3,
@@ -1078,7 +1114,7 @@
// SUBSCRIBE_ERROR arrives
MoqtSubscribeError subscribe_error = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
/*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
/*reason_phrase=*/"foo",
/*track_alias =*/3,
@@ -1098,7 +1134,7 @@
// SUBSCRIBE_ERROR arrives
MoqtSubscribeError subscribe_error = {
- /*subscribe_id=*/1,
+ /*request_id=*/1,
/*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
/*reason_phrase=*/"foo",
/*track_alias =*/2,
@@ -1680,7 +1716,7 @@
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MoqtUnsubscribe unsubscribe = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
};
stream_input->OnUnsubscribeMessage(unsubscribe);
EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 0), nullptr);
@@ -2225,12 +2261,12 @@
TEST_F(MoqtSessionTest, InvalidFetch) {
// Update the state so that it expects ID > 0 next time.
- MoqtSessionPeer::ValidateRequestId(&session_, 0);
+ MoqtSessionPeer::ValidateRequestId(&session_, 1);
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
MoqtFetch fetch = DefaultFetch();
- fetch.fetch_id = 0; // Too low.
+ fetch.fetch_id = 1; // Too low.
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
"Request ID not monotonically increasing"))
@@ -2286,6 +2322,7 @@
TEST_F(MoqtSessionTest, IncomingJoiningFetch) {
MoqtSubscribe subscribe = DefaultSubscribe();
// Give it the latest object filter.
+ subscribe.filter_type = MoqtFilterType::kLatestObject;
subscribe.start = std::nullopt;
subscribe.end_group = std::nullopt;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -2295,7 +2332,7 @@
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtObjectListener* subscription =
- MoqtSessionPeer::GetSubscription(&session_, subscribe.subscribe_id);
+ MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
ASSERT_NE(subscription, nullptr);
EXPECT_TRUE(
MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11)));
@@ -2304,6 +2341,7 @@
// Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
MoqtFetch fetch = DefaultFetch();
+ fetch.fetch_id = 3;
fetch.joining_fetch = {1, 2};
EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
.WillOnce(Return(std::make_unique<MockFetchTask>()));
@@ -2316,7 +2354,7 @@
MoqtFetch fetch = DefaultFetch();
fetch.joining_fetch = {1, 2};
MoqtFetchError expected_error = {
- /*subscribe_id=*/2,
+ /*request_id=*/1,
/*error_code=*/SubscribeErrorCode::kDoesNotExist,
/*reason_phrase=*/"Joining Fetch for non-existent subscribe",
};
@@ -2334,6 +2372,7 @@
ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
MoqtFetch fetch = DefaultFetch();
+ fetch.fetch_id = 3;
fetch.joining_fetch = {1, 2};
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -2349,16 +2388,19 @@
EXPECT_CALL(mock_session_, GetStreamById(_))
.WillRepeatedly(Return(&mock_stream_));
MoqtSubscribe expected_subscribe = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*track_alias=*/0,
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/0x80,
/*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kLatestObject,
/*start=*/std::nullopt,
/*end_group=*/std::nullopt,
+ VersionSpecificParameters(),
};
MoqtFetch expected_fetch = {
- /*fetch_id=*/1,
+ /*fetch_id=*/2,
/*subscriber_priority=*/0x80,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*joining_fetch=*/JoiningFetch(0, 1),
@@ -2391,12 +2433,12 @@
MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0),
MoqtDeliveryOrder::kAscending, Location(2, 0),
VersionSpecificParameters()));
- stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending,
+ stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending,
Location(2, 0),
VersionSpecificParameters()));
// Packet arrives on FETCH stream.
MoqtObject object = {
- /*fetch_id=*/1,
+ /*fetch_id=*/2,
/*group_id, object_id=*/0,
0,
/*publisher_priority=*/128,
@@ -2479,7 +2521,7 @@
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
VersionSpecificParameters());
MoqtFetchOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
VersionSpecificParameters(),
@@ -2508,7 +2550,7 @@
Location(0, 0), 4, std::nullopt, 128, std::nullopt,
VersionSpecificParameters());
MoqtFetchError error = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*error_code=*/SubscribeErrorCode::kUnauthorized,
/*reason_phrase=*/"No username provided",
};
@@ -2546,7 +2588,7 @@
std::queue<quiche::QuicheBuffer> headers;
std::queue<std::string> payloads;
MoqtObject object = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_id, object_id=*/0,
0,
/*publisher_priority=*/128,
@@ -2578,7 +2620,7 @@
// FETCH_OK arrives, objects are delivered.
MoqtFetchOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
VersionSpecificParameters(),
@@ -2616,7 +2658,7 @@
std::queue<quiche::QuicheBuffer> headers;
std::queue<std::string> payloads;
MoqtObject object = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_id, object_id=*/0,
0,
/*publisher_priority=*/128,
@@ -2648,7 +2690,7 @@
// FETCH_OK arrives, objects are available.
MoqtFetchOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/Location(3, 25),
VersionSpecificParameters(),
@@ -2707,7 +2749,7 @@
bool object_ready = false;
task->SetObjectAvailableCallback([&]() { object_ready = true; });
MoqtObject object = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*group_id, object_id=*/0,
0,
/*publisher_priority=*/128,
@@ -3035,6 +3077,7 @@
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
MoqtFetch fetch = DefaultFetch();
+ fetch.fetch_id = 3;
stream_input->OnFetchMessage(fetch);
EXPECT_CALL(
mock_stream_,
@@ -3121,7 +3164,7 @@
&remote_track_visitor,
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
@@ -3179,7 +3222,7 @@
&remote_track_visitor,
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
@@ -3234,7 +3277,7 @@
&remote_track_visitor,
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
@@ -3297,7 +3340,7 @@
MoqtObjectStatus::kNormal);
// Update the end to fall at the last delivered object.
MoqtSubscribeUpdate update = {
- /*subscribe_id=*/0,
+ /*request_id=*/0,
/*start_group=*/5,
/*start_object=*/0,
/*end_group=*/7,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index a829930..0783141 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -116,7 +116,7 @@
virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
};
SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
- : RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
+ : RemoteTrack(subscribe.full_track_name, subscribe.request_id,
SubscribeWindow(subscribe.start.value_or(Location()),
subscribe.end_group)),
track_alias_(subscribe.track_alias),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 9ab9078..7bca1db 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -54,6 +54,8 @@
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/128,
/*group_order=*/std::nullopt,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteStart,
/*start=*/Location(2, 0),
std::nullopt,
VersionSpecificParameters(),
@@ -78,7 +80,7 @@
TEST_F(SubscribeRemoteTrackTest, AllowError) {
EXPECT_TRUE(track_.ErrorIsAllowed());
- EXPECT_EQ(track_.GetSubscribe().subscribe_id, subscribe_.subscribe_id);
+ EXPECT_EQ(track_.GetSubscribe().request_id, subscribe_.request_id);
track_.OnObjectOrOk();
EXPECT_FALSE(track_.ErrorIsAllowed());
}
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 9fa1bf3..e3ddb7b 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -91,7 +91,7 @@
track.get());
session->subscribe_by_name_.try_emplace(subscribe.full_track_name,
track.get());
- session->upstream_by_id_.try_emplace(subscribe.subscribe_id,
+ session->upstream_by_id_.try_emplace(subscribe.request_id,
std::move(track));
}
@@ -102,7 +102,9 @@
MoqtSubscribe subscribe;
subscribe.full_track_name = publisher->GetTrackName();
subscribe.track_alias = track_alias;
- subscribe.subscribe_id = subscribe_id;
+ subscribe.request_id = subscribe_id;
+ subscribe.forward = true;
+ subscribe.filter_type = MoqtFilterType::kAbsoluteStart;
subscribe.start = Location(start_group, start_object);
subscribe.subscriber_priority = 0x80;
session->published_subscriptions_.emplace(
@@ -147,6 +149,10 @@
session->next_request_id_ = id;
}
+ static void set_next_incoming_request_id(MoqtSession* session, uint64_t id) {
+ session->next_incoming_request_id_ = id;
+ }
+
static void set_peer_max_request_id(MoqtSession* session, uint64_t id) {
session->peer_max_request_id_ = id;
}
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index b0709ef..31c2521 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -465,7 +465,7 @@
bool EqualFieldValues(MessageStructuredData& values) const override {
auto cast = std::get<MoqtSubscribe>(values);
- if (cast.subscribe_id != subscribe_.subscribe_id) {
+ if (cast.request_id != subscribe_.request_id) {
QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch";
return false;
}
@@ -485,6 +485,14 @@
QUIC_LOG(INFO) << "SUBSCRIBE group order mismatch";
return false;
}
+ if (cast.forward != subscribe_.forward) {
+ QUIC_LOG(INFO) << "SUBSCRIBE forward mismatch";
+ return false;
+ }
+ if (cast.filter_type != subscribe_.filter_type) {
+ QUIC_LOG(INFO) << "SUBSCRIBE filter type mismatch";
+ return false;
+ }
if (cast.start != subscribe_.start) {
QUIC_LOG(INFO) << "SUBSCRIBE start mismatch";
return false;
@@ -501,7 +509,7 @@
}
void ExpandVarints() override {
- ExpandVarintsImpl("vvvv---v------vvvvv--vv-----");
+ ExpandVarintsImpl("vvvv---v-------vvvvv--vv-----");
}
MessageStructuredData structured_data() const override {
@@ -509,15 +517,16 @@
}
private:
- uint8_t raw_packet_[31] = {
- 0x03, 0x00, 0x1c, 0x01, 0x02, // id and alias
+ uint8_t raw_packet_[32] = {
+ 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, // subscriber priority = 0x20
0x02, // group order = descending
+ 0x01, // forward = true
0x03, // Filter type: Absolute Start
- 0x04, // start_group = 4 (relative previous)
- 0x01, // start_object = 1 (absolute)
+ 0x04, // start_group = 4
+ 0x01, // start_object = 1
// No EndGroup or EndObject
0x02, // 2 parameters
0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
@@ -530,6 +539,8 @@
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/MoqtDeliveryOrder::kDescending,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteStart,
/*start=*/Location(4, 1),
/*end_group=*/std::nullopt,
VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),