Eliminate Track Alias negotiation in MoQT, per draft-12.
PiperOrigin-RevId: 788590992
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 3505cf4..33ff375 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -406,50 +406,40 @@
<< "Serializing invalid MoQT parameters";
return quiche::QuicheBuffer();
}
+ std::optional<uint64_t> start_group, start_object, end_group;
switch (message.filter_type) {
case MoqtFilterType::kNextGroupStart:
case MoqtFilterType::kLatestObject:
- return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
- WireVarInt62(message.track_alias),
- WireFullTrackName(message.full_track_name),
- WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
- WireVarInt62(message.filter_type), WireKeyValuePairList(parameters));
- case MoqtFilterType::kAbsoluteStart:
- if (!message.start.has_value()) {
- return quiche::QuicheBuffer();
- };
- return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
- WireVarInt62(message.track_alias),
- WireFullTrackName(message.full_track_name),
- WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
- WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
- WireVarInt62(message.start->object),
- WireKeyValuePairList(parameters));
+ break;
case MoqtFilterType::kAbsoluteRange:
- if (!message.start.has_value() || !message.end_group.has_value()) {
- return quiche::QuicheBuffer();
- }
- if (*message.end_group < message.start->group) {
+ if (!message.end_group.has_value() || !message.start.has_value() ||
+ *message.end_group < message.start->group) {
QUICHE_BUG(MoqtFramer_invalid_end_group) << "Invalid object range";
return quiche::QuicheBuffer();
}
- return SerializeControlMessage(
- MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
- WireVarInt62(message.track_alias),
- WireFullTrackName(message.full_track_name),
- WireUint8(message.subscriber_priority),
- WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
- WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
- WireVarInt62(message.start->object), WireVarInt62(*message.end_group),
- WireKeyValuePairList(parameters));
+ end_group = *message.end_group;
+ [[fallthrough]];
+ case MoqtFilterType::kAbsoluteStart:
+ if (!message.start.has_value()) {
+ QUICHE_BUG(MoqtFramer_invalid_start) << "Filter requires start";
+ return quiche::QuicheBuffer();
+ }
+ start_group = message.start->group;
+ start_object = message.start->object;
+ break;
default:
QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error.";
return quiche::QuicheBuffer();
}
+ return SerializeControlMessage(
+ MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
+ WireFullTrackName(message.full_track_name),
+ WireUint8(message.subscriber_priority),
+ WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+ WireVarInt62(message.filter_type),
+ WireOptional<WireVarInt62>(start_group),
+ WireOptional<WireVarInt62>(start_object),
+ WireOptional<WireVarInt62>(end_group), WireKeyValuePairList(parameters));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeOk(
@@ -465,6 +455,7 @@
if (message.largest_location.has_value()) {
return SerializeControlMessage(
MoqtMessageType::kSubscribeOk, WireVarInt62(message.request_id),
+ WireVarInt62(message.track_alias),
WireVarInt62(message.expires.ToMilliseconds()),
WireDeliveryOrder(message.group_order), WireUint8(1),
WireVarInt62(message.largest_location->group),
@@ -473,6 +464,7 @@
}
return SerializeControlMessage(
MoqtMessageType::kSubscribeOk, WireVarInt62(message.request_id),
+ WireVarInt62(message.track_alias),
WireVarInt62(message.expires.ToMilliseconds()),
WireDeliveryOrder(message.group_order), WireUint8(0),
WireKeyValuePairList(parameters));
@@ -483,8 +475,7 @@
return SerializeControlMessage(
MoqtMessageType::kSubscribeError, WireVarInt62(message.request_id),
WireVarInt62(message.error_code),
- WireStringWithVarInt62Length(message.reason_phrase),
- WireVarInt62(message.track_alias));
+ WireStringWithVarInt62Length(message.reason_phrase));
}
quiche::QuicheBuffer MoqtFramer::SerializeUnsubscribe(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index c3780db..d88673e 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -393,7 +393,6 @@
MoqtFilterType::kAbsoluteStart, MoqtFilterType::kAbsoluteRange}) {
MoqtSubscribe subscribe = {
/*subscribe_id=*/3,
- /*track_alias=*/4,
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/std::nullopt,
@@ -412,7 +411,6 @@
TEST_F(MoqtFramerSimpleTest, SubscribeEndBeforeStart) {
MoqtSubscribe subscribe = {
/*subscribe_id=*/3,
- /*track_alias=*/4,
/*full_track_name=*/FullTrackName({"foo", "abcd"}),
/*subscriber_priority=*/0x20,
/*group_order=*/std::nullopt,
@@ -428,6 +426,42 @@
EXPECT_EQ(buffer.size(), 0);
}
+TEST_F(MoqtFramerSimpleTest, AbsoluteRangeStartMissing) {
+ MoqtSubscribe subscribe = {
+ /*subscribe_id=*/3,
+ /*full_track_name=*/FullTrackName({"foo", "abcd"}),
+ /*subscriber_priority=*/0x20,
+ /*group_order=*/std::nullopt,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteRange,
+ /*start=*/std::nullopt,
+ /*end_group=*/std::make_optional<uint64_t>(3ULL),
+ VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
+ };
+ quiche::QuicheBuffer buffer;
+ EXPECT_QUICHE_BUG(buffer = framer_.SerializeSubscribe(subscribe),
+ "Invalid object range");
+ EXPECT_EQ(buffer.size(), 0);
+}
+
+TEST_F(MoqtFramerSimpleTest, AbsoluteRangeEndMissing) {
+ MoqtSubscribe subscribe = {
+ /*subscribe_id=*/3,
+ /*full_track_name=*/FullTrackName({"foo", "abcd"}),
+ /*subscriber_priority=*/0x20,
+ /*group_order=*/std::nullopt,
+ /*forward=*/true,
+ /*filter_type=*/MoqtFilterType::kAbsoluteRange,
+ /*start=*/std::make_optional<Location>(4, 3),
+ /*end_group=*/std::nullopt,
+ VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
+ };
+ quiche::QuicheBuffer buffer;
+ EXPECT_QUICHE_BUG(buffer = framer_.SerializeSubscribe(subscribe),
+ "Invalid object range");
+ EXPECT_EQ(buffer.size(), 0);
+}
+
TEST_F(MoqtFramerSimpleTest, FetchEndBeforeStart) {
MoqtFetch fetch = {
/*subscribe_id =*/1,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 8600f62..5b5ee77 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -134,7 +134,6 @@
return absl::StatusCode::kNotFound;
case RequestErrorCode::kInvalidJoiningSubscribeId:
case RequestErrorCode::kMalformedAuthToken:
- case RequestErrorCode::kUnknownAuthTokenAlias:
return absl::StatusCode::kInvalidArgument;
case RequestErrorCode::kExpiredAuthToken:
return absl::StatusCode::kUnauthenticated;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 232c8ab..69beaa9 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -337,11 +337,9 @@
kNamespacePrefixUnknown = 0x4, // SUBSCRIBE_ANNOUNCES_ERROR only.
kInvalidRange = 0x5, // SUBSCRIBE_ERROR and FETCH_ERROR only.
kNamespacePrefixOverlap = 0x5, // SUBSCRIBE_ANNOUNCES_ERROR only.
- kRetryTrackAlias = 0x6, // SUBSCRIBE_ERROR only.
kNoObjects = 0x6, // FETCH_ERROR only.
kInvalidJoiningSubscribeId = 0x7, // FETCH_ERROR only.
kMalformedAuthToken = 0x10,
- kUnknownAuthTokenAlias = 0x11,
kExpiredAuthToken = 0x12,
};
@@ -619,7 +617,6 @@
struct QUICHE_EXPORT MoqtSubscribe {
uint64_t request_id;
- uint64_t track_alias;
FullTrackName full_track_name;
MoqtPriority subscriber_priority;
std::optional<MoqtDeliveryOrder> group_order;
@@ -632,6 +629,7 @@
struct QUICHE_EXPORT MoqtSubscribeOk {
uint64_t request_id;
+ uint64_t track_alias;
// The message uses ms, but expires is in us.
quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
MoqtDeliveryOrder group_order;
@@ -644,7 +642,6 @@
uint64_t request_id;
RequestErrorCode error_code;
std::string reason_phrase;
- uint64_t track_alias;
};
struct QUICHE_EXPORT MoqtUnsubscribe {
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 75dcc5f..d381d04 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -370,7 +370,6 @@
uint64_t filter, group, object;
uint8_t group_order, forward;
if (!reader.ReadVarInt62(&subscribe.request_id) ||
- !reader.ReadVarInt62(&subscribe.track_alias) ||
!ReadFullTrackName(reader, subscribe.full_track_name) ||
!reader.ReadUInt8(&subscribe.subscriber_priority) ||
!reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) ||
@@ -436,6 +435,7 @@
uint8_t group_order;
uint8_t content_exists;
if (!reader.ReadVarInt62(&subscribe_ok.request_id) ||
+ !reader.ReadVarInt62(&subscribe_ok.track_alias) ||
!reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) ||
!reader.ReadUInt8(&content_exists)) {
return 0;
@@ -479,8 +479,7 @@
uint64_t error_code;
if (!reader.ReadVarInt62(&subscribe_error.request_id) ||
!reader.ReadVarInt62(&error_code) ||
- !reader.ReadStringVarInt62(subscribe_error.reason_phrase) ||
- !reader.ReadVarInt62(&subscribe_error.track_alias)) {
+ !reader.ReadStringVarInt62(subscribe_error.reason_phrase)) {
return 0;
}
subscribe_error.error_code = static_cast<RequestErrorCode>(error_code);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 4f6535a..7e4f5ce 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -580,8 +580,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1b, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x1a, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -598,8 +598,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x17, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x16, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -618,8 +618,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x17, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x16, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -638,8 +638,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x15, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x14, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -657,8 +657,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x19, 0x01, 0x02, 0x01, 0x03, 0x66,
- 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x18, 0x01, 0x01, 0x03, 0x66, 0x6f,
+ 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -676,8 +676,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x15, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x14, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -696,8 +696,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x15, 0x01, 0x02,
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x14, 0x01, 0x01,
+ 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -716,8 +716,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x17, 0x01, 0x02, 0x01,
- 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x00, 0x16, 0x01, 0x01, 0x03,
+ 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
0x02, // filter_type = kLatestObject
@@ -735,19 +735,38 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, // subscriber priority = 0x20
- 0x03, // group order = invalid
- 0x01, // forward = true
- 0x03, // Filter type: Absolute Start
- 0x04, // start_group = 4 (relative previous)
- 0x01, // start_object = 1 (absolute)
+ 0x03,
+ 0x00,
+ 0x1c,
+ 0x01, // id
+ 0x01,
+ 0x03,
+ 0x66,
+ 0x6f,
+ 0x6f, // track_namespace = "foo"
+ 0x04,
+ 0x61,
+ 0x62,
+ 0x63,
+ 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x03, // group order = invalid
+ 0x01, // forward = true
+ 0x03, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
// No EndGroup or EndObject
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
- 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ 0x02, // 2 parameters
+ 0x02,
+ 0x67,
+ 0x10, // delivery_timeout = 10000 ms
+ 0x03,
+ 0x05,
+ 0x03,
+ 0x00,
+ 0x62,
+ 0x61,
+ 0x72, // authorization_tag = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -760,19 +779,38 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, // subscriber priority = 0x20
- 0x02, // group order = descending
- 0x02, // forward = invalid
- 0x03, // Filter type: Absolute Start
- 0x04, // start_group = 4 (relative previous)
- 0x01, // start_object = 1 (absolute)
+ 0x03,
+ 0x00,
+ 0x1c,
+ 0x01, // id
+ 0x01,
+ 0x03,
+ 0x66,
+ 0x6f,
+ 0x6f, // track_namespace = "foo"
+ 0x04,
+ 0x61,
+ 0x62,
+ 0x63,
+ 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x02, // group order = descending
+ 0x02, // forward = invalid
+ 0x03, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
// No EndGroup or EndObject
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
- 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ 0x02, // 2 parameters
+ 0x02,
+ 0x67,
+ 0x10, // delivery_timeout = 10000 ms
+ 0x03,
+ 0x05,
+ 0x03,
+ 0x00,
+ 0x62,
+ 0x61,
+ 0x72, // authorization_tag = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -785,19 +823,38 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, // subscriber priority = 0x20
- 0x02, // group order = descending
- 0x01, // forward = true
- 0x05, // Filter type: Absolute Start
- 0x04, // start_group = 4 (relative previous)
- 0x01, // start_object = 1 (absolute)
+ 0x03,
+ 0x00,
+ 0x1c,
+ 0x01, // id
+ 0x01,
+ 0x03,
+ 0x66,
+ 0x6f,
+ 0x6f, // track_namespace = "foo"
+ 0x04,
+ 0x61,
+ 0x62,
+ 0x63,
+ 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x02, // group order = descending
+ 0x01, // forward = true
+ 0x05, // Filter type: Absolute Start
+ 0x04, // start_group = 4 (relative previous)
+ 0x01, // start_object = 1 (absolute)
// No EndGroup or EndObject
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
- 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ 0x02, // 2 parameters
+ 0x02,
+ 0x67,
+ 0x10, // delivery_timeout = 10000 ms
+ 0x03,
+ 0x05,
+ 0x03,
+ 0x00,
+ 0x62,
+ 0x61,
+ 0x72, // authorization_tag = "bar"
};
stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
parser.ReadAndDispatchMessages();
@@ -810,8 +867,8 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kWebTrans, &stream, visitor_);
char subscribe_ok[] = {
- 0x04, 0x00, 0x11, 0x01, 0x03, // subscribe_id = 1, expires = 3
- 0x02, 0x01, // group_order = 2, content exists
+ 0x04, 0x00, 0x12, 0x01, 0x02, 0x03, // subscribe_id, alias, expires = 3
+ 0x02, 0x01, // group_order = 2, content exists
0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20,
0x02, // 2 parameters
0x02, 0x67, 0x10, // delivery_timeout = 10000
@@ -997,7 +1054,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x17, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority = 0x20, group order, forward
@@ -1019,7 +1076,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x17, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x08, 0x01, // priority, invalid order, forward
@@ -1037,7 +1094,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1a, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x19, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
@@ -1062,7 +1119,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x1b, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x1a, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
@@ -1088,7 +1145,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x19, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x18, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
@@ -1109,7 +1166,7 @@
webtransport::test::InMemoryStream stream(/*stream_id=*/0);
MoqtControlParser parser(kRawQuic, &stream, visitor_);
char subscribe[] = {
- 0x03, 0x00, 0x14, 0x01, 0x02, // id and alias
+ 0x03, 0x00, 0x13, 0x01, // id
0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
0x20, 0x02, 0x01, // priority, order, forward
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index eb11c26..ed75729 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -581,7 +581,7 @@
subscribe.start = std::nullopt;
subscribe.end_group = std::nullopt;
subscribe.parameters = parameters;
- if (!Subscribe(subscribe, visitor, std::nullopt)) {
+ if (!Subscribe(subscribe, visitor)) {
return false;
}
MoqtFetch fetch;
@@ -707,12 +707,13 @@
void MoqtSession::DestroySubscription(SubscribeRemoteTrack* subscribe) {
subscribe->visitor()->OnSubscribeDone(subscribe->full_track_name());
subscribe_by_name_.erase(subscribe->full_track_name());
- subscribe_by_alias_.erase(subscribe->track_alias());
+ if (subscribe->track_alias().has_value()) {
+ subscribe_by_alias_.erase(*subscribe->track_alias());
+ }
}
bool MoqtSession::Subscribe(MoqtSubscribe& message,
- SubscribeRemoteTrack::Visitor* visitor,
- std::optional<uint64_t> provided_track_alias) {
+ SubscribeRemoteTrack::Visitor* visitor) {
// TODO(martinduke): support authorization info
if (next_request_id_ >= peer_max_request_id_) {
if (!last_requests_blocked_sent_.has_value() ||
@@ -734,24 +735,12 @@
<< " which is already subscribed";
return false;
}
- if (provided_track_alias.has_value() &&
- subscribe_by_alias_.contains(*provided_track_alias)) {
- Error(MoqtError::kProtocolViolation, "Provided track alias already in use");
- return false;
- }
if (received_goaway_ || sent_goaway_) {
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE after GOAWAY";
return false;
}
message.request_id = next_request_id_;
next_request_id_ += 2;
- if (provided_track_alias.has_value()) {
- message.track_alias = *provided_track_alias;
- next_remote_track_alias_ =
- std::max(next_remote_track_alias_, *provided_track_alias) + 1;
- } else {
- message.track_alias = next_remote_track_alias_++;
- }
if (SupportsObjectAck() && visitor != nullptr) {
// Since we do not expose subscribe IDs directly in the API, instead wrap
// the session and subscribe ID in a callback.
@@ -768,7 +757,6 @@
<< message.full_track_name;
auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
subscribe_by_name_.emplace(message.full_track_name, track.get());
- subscribe_by_alias_.emplace(message.track_alias, track.get());
upstream_by_id_.emplace(message.request_id, std::move(track));
return true;
}
@@ -1012,12 +1000,11 @@
void MoqtSession::ControlStream::SendSubscribeError(
uint64_t request_id, RequestErrorCode error_code,
- absl::string_view reason_phrase, uint64_t track_alias) {
+ absl::string_view reason_phrase) {
MoqtSubscribeError subscribe_error;
subscribe_error.request_id = request_id;
subscribe_error.error_code = error_code;
subscribe_error.reason_phrase = reason_phrase;
- subscribe_error.track_alias = track_alias;
SendOrBufferMessage(
session_->framer_.SerializeSubscribeError(subscribe_error));
}
@@ -1042,7 +1029,7 @@
if (session_->sent_goaway_) {
QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY";
SendSubscribeError(message.request_id, RequestErrorCode::kUnauthorized,
- "SUBSCRIBE after GOAWAY", message.track_alias);
+ "SUBSCRIBE after GOAWAY");
return;
}
if (session_->subscribed_track_names_.contains(message.full_track_name)) {
@@ -1058,7 +1045,7 @@
<< " rejected by the application: "
<< track_publisher.status();
SendSubscribeError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
- track_publisher.status().message(), message.track_alias);
+ track_publisher.status().message());
return;
}
@@ -1111,6 +1098,13 @@
}
SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
subscribe->OnObjectOrOk();
+ auto [it, success] =
+ session_->subscribe_by_alias_.try_emplace(message.track_alias, subscribe);
+ if (!success) {
+ session_->Error(MoqtError::kDuplicateTrackAlias, "");
+ return;
+ }
+ subscribe->set_track_alias(message.track_alias);
// TODO(martinduke): Handle expires field.
if (message.largest_location.has_value()) {
subscribe->TruncateStart(message.largest_location->next());
@@ -1151,16 +1145,10 @@
// an error due to a duplicate track name. The other entries for this
// subscribe will be deleted after calling Subscribe().
session_->subscribe_by_name_.erase(subscribe->full_track_name());
- if (message.error_code == RequestErrorCode::kRetryTrackAlias) {
- // Automatically resubscribe with new alias.
- MoqtSubscribe& subscribe_message = subscribe->GetSubscribe();
- session_->Subscribe(subscribe_message, subscribe->visitor(),
- message.track_alias);
- } else if (subscribe->visitor() != nullptr) {
+ if (subscribe->visitor() != nullptr) {
subscribe->visitor()->OnReply(subscribe->full_track_name(), std::nullopt,
message.reason_phrase);
}
- session_->subscribe_by_alias_.erase(subscribe->track_alias());
session_->upstream_by_id_.erase(subscribe->request_id());
}
@@ -1849,8 +1837,8 @@
if (it == session_->subscribe_by_alias_.end()) {
QUIC_DLOG(INFO) << ENDPOINT
<< "Received object for a track with no SUBSCRIBE";
- // This is a not a session error because there might be an UNSUBSCRIBE in
- // flight.
+ // This is a not a session error because there might be an UNSUBSCRIBE or
+ // SUBSCRIBE_OK (containing the track alias) in flight.
stream_->SendStopSending(kResetCodeCanceled);
return;
}
@@ -1898,7 +1886,6 @@
: session_(session),
track_publisher_(track_publisher),
request_id_(subscribe.request_id),
- track_alias_(subscribe.track_alias),
filter_type_(subscribe.filter_type),
forward_(subscribe.forward),
window_(SubscribeMessageToWindow(subscribe)),
@@ -1993,19 +1980,26 @@
}
MoqtSubscribeOk subscribe_ok;
subscribe_ok.request_id = request_id_;
+ subscribe_ok.track_alias = session_->next_local_track_alias_++;
subscribe_ok.group_order = track_publisher_->GetDeliveryOrder();
subscribe_ok.largest_location = largest_location;
+ track_alias_.emplace(subscribe_ok.track_alias);
// TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the
// publisher.
stream->SendOrBufferMessage(
session_->framer_.SerializeSubscribeOk(subscribe_ok));
+ if (!PublisherHasData(*track_publisher_)) {
+ return;
+ }
+ // TODO(martinduke): If we buffer objects that arrived previously, the arrival
+ // of the track alias disambiguates what subscription they belong to. Send
+ // them.
}
void MoqtSession::PublishedSubscription::OnSubscribeRejected(
MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias) {
session_->GetControlStream()->SendSubscribeError(
- request_id_, reason.error_code, reason.reason_phrase,
- track_alias.value_or(track_alias_));
+ request_id_, reason.error_code, reason.reason_phrase);
session_->published_subscriptions_.erase(request_id_);
// No class access below this line!
}
@@ -2331,6 +2325,9 @@
void MoqtSession::OutgoingDataStream::SendObjects(
PublishedSubscription& subscription) {
+ if (!subscription.track_alias().has_value()) {
+ return;
+ }
while (stream_->CanWrite()) {
std::optional<PublishedObject> object =
subscription.publisher().GetCachedObject(index_.group, index_.subgroup,
@@ -2362,7 +2359,7 @@
return;
}
if (!session_->WriteObjectToStream(
- stream_, subscription.track_alias(), object->metadata,
+ stream_, *subscription.track_alias(), object->metadata,
std::move(object->payload), stream_type_, !stream_header_written_,
object->fin_after_this)) {
// WriteObjectToStream() closes the connection on error, meaning that
@@ -2483,9 +2480,11 @@
<< "Got notification about an object that is not in the cache";
return;
}
-
+ if (!track_alias_.has_value()) {
+ return;
+ }
MoqtObject header;
- header.track_alias = track_alias();
+ header.track_alias = *track_alias_;
header.group_id = object->metadata.location.group;
header.object_id = object->metadata.location.object;
header.publisher_priority = object->metadata.publisher_priority;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 25c5ae2..79b4613 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -291,8 +291,7 @@
void SendOrBufferMessage(quiche::QuicheBuffer message, bool fin = false);
void SendSubscribeError(uint64_t request_id, RequestErrorCode error_code,
- absl::string_view reason_phrase,
- uint64_t track_alias);
+ absl::string_view reason_phrase);
private:
friend class test::MoqtSessionPeer;
@@ -363,7 +362,7 @@
uint64_t request_id() const { return request_id_; }
MoqtTrackPublisher& publisher() { return *track_publisher_; }
- uint64_t track_alias() const { return track_alias_; }
+ std::optional<uint64_t> track_alias() const { return track_alias_; }
std::optional<Location> largest_sent() const { return largest_sent_; }
MoqtPriority subscriber_priority() const { return subscriber_priority_; }
std::optional<MoqtDeliveryOrder> subscriber_delivery_order() const {
@@ -447,6 +446,7 @@
uint64_t streams_opened() const { return streams_opened_; }
private:
+ friend class test::MoqtSessionPeer;
SendStreamMap& stream_map();
quic::Perspective perspective() const {
return session_->parameters_.perspective;
@@ -462,7 +462,7 @@
MoqtSession* session_;
std::shared_ptr<MoqtTrackPublisher> track_publisher_;
uint64_t request_id_;
- uint64_t track_alias_;
+ std::optional<const uint64_t> track_alias_;
MoqtFilterType filter_type_;
bool forward_;
// If window_ is nullopt, any arriving objects are ignored. This could be
@@ -711,10 +711,9 @@
// is present.
void SendControlMessage(quiche::QuicheBuffer message);
- // Returns false if the SUBSCRIBE isn't sent. |provided_track_alias| has a
- // value only if this call is due to a SUBSCRIBE_ERROR.
- bool Subscribe(MoqtSubscribe& message, SubscribeRemoteTrack::Visitor* visitor,
- std::optional<uint64_t> provided_track_alias = std::nullopt);
+ // Returns false if the SUBSCRIBE isn't sent.
+ bool Subscribe(MoqtSubscribe& message,
+ SubscribeRemoteTrack::Visitor* visitor);
// Opens a new data stream, or queues it if the session is flow control
// blocked.
@@ -793,8 +792,7 @@
absl::flat_hash_map<uint64_t, SubscribeRemoteTrack*> subscribe_by_alias_;
// All SUBSCRIBEs, indexed by track name.
absl::flat_hash_map<FullTrackName, SubscribeRemoteTrack*> subscribe_by_name_;
- // The next track alias to guess on a SUBSCRIBE.
- uint64_t next_remote_track_alias_ = 0;
+
// The next subscribe ID that the local endpoint can send.
uint64_t next_request_id_ = 0;
// The local endpoint can send subscribe IDs less than this value.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 55e47dc..9862e11 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -65,7 +65,6 @@
MoqtSubscribe DefaultSubscribe(uint64_t request_id) {
MoqtSubscribe subscribe = {
request_id,
- /*track_alias=*/2,
kDefaultTrackName(),
/*subscriber_priority=*/0x80,
/*group_order=*/std::nullopt,
@@ -156,7 +155,7 @@
// publish objects.
MoqtObjectListener* ReceiveSubscribeSynchronousOk(
MockTrackPublisher* publisher, MoqtSubscribe& subscribe,
- MoqtControlParserVisitor* control_parser) {
+ MoqtControlParserVisitor* control_parser, uint64_t track_alias = 0) {
MoqtObjectListener* listener_ptr = nullptr;
EXPECT_CALL(*publisher, AddObjectListener)
.WillOnce([&](MoqtObjectListener* listener) {
@@ -170,6 +169,7 @@
}
MoqtSubscribeOk expected_ok = {
/*request_id=*/subscribe.request_id,
+ track_alias,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
/*group_order=*/MoqtDeliveryOrder::kAscending,
(*track_status == MoqtTrackStatusCode::kInProgress)
@@ -507,8 +507,7 @@
mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribeError), _));
listener->OnSubscribeRejected(
- MoqtSubscribeErrorReason(RequestErrorCode::kInternalError, "Test error"),
- request.track_alias);
+ MoqtSubscribeErrorReason(RequestErrorCode::kInternalError, "Test error"));
EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId),
nullptr);
}
@@ -607,7 +606,8 @@
// Subscribe again, succeeds.
request.request_id = 3;
request.start = Location(12, 0);
- ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+ ReceiveSubscribeSynchronousOk(track, request, stream_input.get(),
+ /*track_alias=*/1);
}
TEST_F(MoqtSessionTest, RequestIdTooHigh) {
@@ -638,7 +638,6 @@
stream_input->OnSubscribeMessage(request);
// Second request is a protocol violation.
- ++request.track_alias;
request.full_track_name = FullTrackName({"dead", "beef"});
EXPECT_CALL(mock_session_,
CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
@@ -701,6 +700,7 @@
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
@@ -720,7 +720,6 @@
EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream_));
MoqtSubscribe subscribe = {
/*request_id=*/0,
- /*track_alias=*/0,
FullTrackName("foo", "bar"),
kDefaultSubscriberPriority,
/*group_order=*/std::nullopt,
@@ -738,6 +737,7 @@
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
@@ -763,6 +763,7 @@
VersionSpecificParameters());
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
};
EXPECT_CALL(remote_track_visitor, OnReply);
@@ -773,7 +774,7 @@
EXPECT_TRUE(session_.SubscribeUpdate(
FullTrackName("foo", "bar"), Location(2, 1), 9, std::nullopt,
std::nullopt, VersionSpecificParameters()));
- SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+ SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 2);
EXPECT_FALSE(track->InWindow(Location(2, 0)));
EXPECT_TRUE(track->InWindow(Location(2, 1)));
EXPECT_TRUE(track->InWindow(Location(9, UINT64_MAX)));
@@ -793,6 +794,7 @@
VersionSpecificParameters());
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
};
EXPECT_CALL(remote_track_visitor, OnReply);
@@ -880,7 +882,6 @@
/*request_id=*/0,
/*error_code=*/RequestErrorCode::kInvalidRange,
/*reason_phrase=*/"deadbeef",
- /*track_alias=*/2,
};
EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
.WillOnce([&](const FullTrackName& ftn,
@@ -897,7 +898,7 @@
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
- &remote_track_visitor);
+ /*track_alias=*/2, &remote_track_visitor);
EXPECT_CALL(mock_stream_,
Writev(ControlMessageOfType(MoqtMessageType::kUnsubscribe), _));
EXPECT_NE(MoqtSessionPeer::remote_track(&session_, 2), nullptr);
@@ -1054,7 +1055,8 @@
MockSubscribeRemoteTrackVisitor visitor_;
FullTrackName ftn("foo", "bar");
std::string payload = "deadbeef";
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(), &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
+ /*track_alias=*/2, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -1079,7 +1081,8 @@
MockSubscribeRemoteTrackVisitor visitor_;
FullTrackName ftn("foo", "bar");
std::string payload = "deadbeef";
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(), &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
+ /*track_alias=*/2, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -1110,7 +1113,8 @@
MockSubscribeRemoteTrackVisitor visitor_;
FullTrackName ftn("foo", "bar");
std::string payload = "deadbeef";
- MoqtSessionPeer::CreateRemoteTrack(&session, DefaultSubscribe(), &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session, DefaultSubscribe(),
+ /*track_alias=*/2, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -1137,7 +1141,7 @@
FullTrackName ftn("foo", "bar");
std::string payload = "deadbeef";
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultLocalSubscribe(),
- &visitor_);
+ std::nullopt, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -1151,22 +1155,13 @@
std::unique_ptr<MoqtDataParserVisitor> object_stream =
MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream_,
kDefaultSubgroupStreamType);
-
- EXPECT_CALL(visitor_, OnObjectFragment)
- .WillOnce([&](const FullTrackName& full_track_name,
- const PublishedObjectMetadata& metadata,
- absl::string_view payload, bool end_of_message) {
- EXPECT_EQ(full_track_name, ftn);
- EXPECT_EQ(metadata.location.group, object.group_id);
- EXPECT_EQ(metadata.location.object, object.object_id);
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kIncomingUniStreamId));
+ EXPECT_CALL(mock_stream_, SendStopSending);
object_stream->OnObjectMessage(object, payload, true);
// SUBSCRIBE_OK arrives
MoqtSubscribeOk ok = {
kDefaultLocalRequestId,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_location=*/std::nullopt,
@@ -1178,97 +1173,33 @@
control_stream->OnSubscribeOkMessage(ok);
}
-TEST_F(MoqtSessionTest, ObjectBeforeSubscribeError) {
+TEST_F(MoqtSessionTest, SubscribeOkWithBadTrackAlias) {
MockSubscribeRemoteTrackVisitor visitor;
- FullTrackName ftn("foo", "bar");
- std::string payload = "deadbeef";
+ // Create open subscription.
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultLocalSubscribe(),
+ /*track_alias=*/2, &visitor);
+ MoqtSubscribe subscribe2 = DefaultLocalSubscribe();
+ subscribe2.request_id += 2;
+ subscribe2.full_track_name = FullTrackName("foo2", "bar2");
+ MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe2, std::nullopt,
&visitor);
- MoqtObject object = {
+
+ // SUBSCRIBE_OK arrives
+ MoqtSubscribeOk subscribe_ok = {
+ subscribe2.request_id,
/*track_alias=*/2,
- /*group_sequence=*/0,
- /*object_sequence=*/0,
- /*publisher_priority=*/0,
- /*extension_headers=*/"",
- /*object_status=*/MoqtObjectStatus::kNormal,
- /*subgroup_id=*/0,
- /*payload_length=*/8,
- };
- std::unique_ptr<MoqtDataParserVisitor> object_stream =
- MoqtSessionPeer::CreateIncomingDataStream(&session_, &mock_stream_,
- kDefaultSubgroupStreamType);
-
- EXPECT_CALL(visitor, OnObjectFragment)
- .WillOnce([&](const FullTrackName& full_track_name,
- const PublishedObjectMetadata& metadata,
- absl::string_view payload, bool end_of_message) {
- EXPECT_EQ(full_track_name, ftn);
- EXPECT_EQ(metadata.location.group, object.group_id);
- EXPECT_EQ(metadata.location.object, object.object_id);
- });
- EXPECT_CALL(mock_stream_, GetStreamId())
- .WillRepeatedly(Return(kIncomingUniStreamId));
- object_stream->OnObjectMessage(object, payload, true);
-
- // SUBSCRIBE_ERROR arrives
- MoqtSubscribeError subscribe_error = {
- kDefaultLocalRequestId,
- /*error_code=*/RequestErrorCode::kRetryTrackAlias,
- /*reason_phrase=*/"foo",
- /*track_alias =*/3,
+ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*largest_location=*/std::nullopt,
+ VersionSpecificParameters(),
};
webtransport::test::MockStream mock_control_stream;
std::unique_ptr<MoqtControlParserVisitor> control_stream =
MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
EXPECT_CALL(
mock_session_,
- CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
- "Received SUBSCRIBE_ERROR after SUBSCRIBE_OK or objects"))
- .Times(1);
- control_stream->OnSubscribeErrorMessage(subscribe_error);
-}
-
-TEST_F(MoqtSessionTest, SubscribeErrorWithTrackAlias) {
- MockSubscribeRemoteTrackVisitor visitor;
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultLocalSubscribe(),
- &visitor);
-
- // SUBSCRIBE_ERROR arrives
- MoqtSubscribeError subscribe_error = {
- kDefaultLocalRequestId,
- /*error_code=*/RequestErrorCode::kRetryTrackAlias,
- /*reason_phrase=*/"foo",
- /*track_alias =*/3,
- };
- webtransport::test::MockStream mock_control_stream;
- std::unique_ptr<MoqtControlParserVisitor> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
- EXPECT_CALL(mock_control_stream,
- Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _))
- .Times(1);
- control_stream->OnSubscribeErrorMessage(subscribe_error);
-}
-
-TEST_F(MoqtSessionTest, SubscribeErrorWithBadTrackAlias) {
- MockSubscribeRemoteTrackVisitor visitor;
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultLocalSubscribe(),
- &visitor);
-
- // SUBSCRIBE_ERROR arrives
- MoqtSubscribeError subscribe_error = {
- kDefaultLocalRequestId,
- /*error_code=*/RequestErrorCode::kRetryTrackAlias,
- /*reason_phrase=*/"foo",
- /*track_alias =*/2,
- };
- webtransport::test::MockStream mock_control_stream;
- std::unique_ptr<MoqtControlParserVisitor> control_stream =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
- EXPECT_CALL(mock_session_,
- CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
- "Provided track alias already in use"))
- .Times(1);
- control_stream->OnSubscribeErrorMessage(subscribe_error);
+ CloseSession(static_cast<uint64_t>(MoqtError::kDuplicateTrackAlias), ""));
+ control_stream->OnSubscribeOkMessage(subscribe_ok);
}
TEST_F(MoqtSessionTest, CreateOutgoingDataStreamAndSend) {
@@ -2013,7 +1944,8 @@
MockSubscribeRemoteTrackVisitor visitor_;
FullTrackName ftn("foo", "bar");
std::string payload = "deadbeef";
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(), &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
+ /*track_alias=*/2, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -2044,7 +1976,8 @@
TEST_F(MoqtSessionTest, DataStreamTypeMismatch) {
MockSubscribeRemoteTrackVisitor visitor_;
std::string payload = "deadbeef";
- MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(), &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
+ /*track_alias=*/2, &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -2078,7 +2011,8 @@
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
subscribe.start = Location(1, 0);
- MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, /*track_alias=*/2,
+ &visitor_);
MoqtObject object = {
/*track_alias=*/2,
/*group_sequence=*/0,
@@ -2101,7 +2035,8 @@
std::string payload = "deadbeef";
MoqtSubscribe subscribe = DefaultSubscribe();
subscribe.start = Location(1, 0);
- MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, &visitor_);
+ MoqtSessionPeer::CreateRemoteTrack(&session_, subscribe, /*track_alias=*/2,
+ &visitor_);
char datagram[] = {0x01, 0x02, 0x00, 0x00, 0x80, 0x00, 0x08, 0x64,
0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
EXPECT_CALL(visitor_, OnObjectFragment).Times(0);
@@ -2655,7 +2590,6 @@
.WillRepeatedly(Return(&mock_stream_));
MoqtSubscribe expected_subscribe = {
/*request_id=*/0,
- /*track_alias=*/0,
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/0x80,
/*group_order=*/MoqtDeliveryOrder::kAscending,
@@ -2696,7 +2630,7 @@
EXPECT_CALL(remote_track_visitor, OnReply).Times(1);
stream_input->OnSubscribeOkMessage(
- MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0),
+ MoqtSubscribeOk(0, 2, quic::QuicTimeDelta::FromMilliseconds(0),
MoqtDeliveryOrder::kAscending, Location(2, 0),
VersionSpecificParameters()));
stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending,
@@ -3452,6 +3386,7 @@
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_location=*/std::nullopt,
@@ -3510,6 +3445,7 @@
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_location=*/std::nullopt,
@@ -3565,6 +3501,7 @@
VersionSpecificParameters()));
MoqtSubscribeOk ok = {
/*request_id=*/0,
+ /*track_alias=*/0,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
/*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_location=*/std::nullopt,
@@ -3616,7 +3553,7 @@
TEST_F(MoqtSessionTest, SubgroupStreamOutOfOrder) {
MockSubscribeRemoteTrackVisitor remote_track_visitor;
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
- &remote_track_visitor);
+ /*track_alias=*/2, &remote_track_visitor);
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
@@ -3647,7 +3584,7 @@
TEST_F(MoqtSessionTest, SubgroupStreamObjectAfterGroupEnd) {
MockSubscribeRemoteTrackVisitor remote_track_visitor;
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
- &remote_track_visitor);
+ /*track_alias=*/2, &remote_track_visitor);
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
@@ -3678,7 +3615,7 @@
TEST_F(MoqtSessionTest, SubgroupStreamObjectAfterTrackEnd) {
MockSubscribeRemoteTrackVisitor remote_track_visitor;
MoqtSessionPeer::CreateRemoteTrack(&session_, DefaultSubscribe(),
- &remote_track_visitor);
+ /*track_alias=*/2, &remote_track_visitor);
webtransport::test::MockStream control_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index be3dece..d2bfc18 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -124,11 +124,9 @@
SubscribeWindow(subscribe.start.value_or(Location()),
subscribe.end_group),
subscribe.subscriber_priority),
- track_alias_(subscribe.track_alias),
forward_(subscribe.forward),
visitor_(visitor),
- delivery_timeout_(subscribe.parameters.delivery_timeout),
- subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
+ delivery_timeout_(subscribe.parameters.delivery_timeout) {}
~SubscribeRemoteTrack() override {
if (subscribe_done_alarm_ != nullptr) {
subscribe_done_alarm_->PermanentCancel();
@@ -136,16 +134,14 @@
}
void OnObjectOrOk() override {
- subscribe_.reset(); // No SUBSCRIBE_ERROR, no need to store this anymore.
RemoteTrack::OnObjectOrOk();
}
- uint64_t track_alias() const { return track_alias_; }
- Visitor* visitor() { return visitor_; }
- MoqtSubscribe& GetSubscribe() {
- return *subscribe_;
- // This class will soon be destroyed, so there's no need to null the
- // unique_ptr;
+ std::optional<uint64_t> track_alias() const { return track_alias_; }
+ void set_track_alias(uint64_t track_alias) {
+ track_alias_.emplace(track_alias);
}
+ Visitor* visitor() { return visitor_; }
+
// Returns false if the forwarding preference is changing on the track.
bool OnObject(bool is_datagram) {
OnObjectOrOk();
@@ -191,7 +187,7 @@
void FetchObjects();
std::unique_ptr<MoqtFetchTask> fetch_task_;
- const uint64_t track_alias_;
+ std::optional<const uint64_t> track_alias_;
bool forward_;
Visitor* visitor_;
std::optional<bool> is_datagram_;
@@ -205,10 +201,6 @@
quic::QuicTimeDelta delivery_timeout_ = quic::QuicTimeDelta::Infinite();
std::unique_ptr<quic::QuicAlarm> subscribe_done_alarm_ = nullptr;
const quic::QuicClock* clock_ = nullptr;
-
- // For convenience, store the subscribe message if it has to be re-sent with
- // a new track alias.
- std::unique_ptr<MoqtSubscribe> subscribe_;
};
// MoqtSession calls this when a FETCH_OK or FETCH_ERROR is received. The
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 9f3956c..d8fa4a2 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -46,7 +46,6 @@
MockSubscribeRemoteTrackVisitor visitor_;
MoqtSubscribe subscribe_ = {
/*subscribe_id=*/1,
- /*track_alias=*/2,
/*full_track_name=*/FullTrackName("foo", "bar"),
/*subscriber_priority=*/128,
/*group_order=*/std::nullopt,
@@ -62,9 +61,11 @@
TEST_F(SubscribeRemoteTrackTest, Queries) {
EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
EXPECT_EQ(track_.request_id(), 1);
- EXPECT_EQ(track_.track_alias(), 2);
+ EXPECT_FALSE(track_.track_alias().has_value());
EXPECT_EQ(track_.visitor(), &visitor_);
EXPECT_FALSE(track_.is_fetch());
+ track_.set_track_alias(1);
+ EXPECT_EQ(track_.track_alias(), 1);
}
TEST_F(SubscribeRemoteTrackTest, UpdateDataStreamType) {
@@ -75,7 +76,6 @@
TEST_F(SubscribeRemoteTrackTest, AllowError) {
EXPECT_TRUE(track_.ErrorIsAllowed());
- EXPECT_EQ(track_.GetSubscribe().request_id, subscribe_.request_id);
track_.OnObjectOrOk();
EXPECT_FALSE(track_.ErrorIsAllowed());
}
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index f411c82..4064ed3 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -86,10 +86,13 @@
static void CreateRemoteTrack(MoqtSession* session,
const MoqtSubscribe& subscribe,
+ const std::optional<uint64_t> track_alias,
SubscribeRemoteTrack::Visitor* visitor) {
auto track = std::make_unique<SubscribeRemoteTrack>(subscribe, visitor);
- session->subscribe_by_alias_.try_emplace(subscribe.track_alias,
- track.get());
+ if (track_alias.has_value()) {
+ track->set_track_alias(*track_alias);
+ session->subscribe_by_alias_.try_emplace(*track_alias, track.get());
+ }
session->subscribe_by_name_.try_emplace(subscribe.full_track_name,
track.get());
session->upstream_by_id_.try_emplace(subscribe.request_id,
@@ -102,7 +105,6 @@
uint64_t start_object) {
MoqtSubscribe subscribe;
subscribe.full_track_name = publisher->GetTrackName();
- subscribe.track_alias = track_alias;
subscribe.request_id = subscribe_id;
subscribe.forward = true;
subscribe.filter_type = MoqtFilterType::kAbsoluteStart;
@@ -112,6 +114,8 @@
subscribe_id, std::make_unique<MoqtSession::PublishedSubscription>(
session, std::move(publisher), subscribe,
/*monitoring_interface=*/nullptr));
+ session->published_subscriptions_[subscribe_id]->track_alias_.emplace(
+ track_alias);
return session->published_subscriptions_[subscribe_id].get();
}
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index cf5fdfd..8c3bd33 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -570,10 +570,6 @@
QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch";
return false;
}
- if (cast.track_alias != subscribe_.track_alias) {
- QUIC_LOG(INFO) << "SUBSCRIBE track alias mismatch";
- return false;
- }
if (cast.full_track_name != subscribe_.full_track_name) {
QUIC_LOG(INFO) << "SUBSCRIBE track name mismatch";
return false;
@@ -610,7 +606,7 @@
}
void ExpandVarints() override {
- ExpandVarintsImpl("vvvv---v-------vvvvv--vv-----");
+ ExpandVarintsImpl("vvv---v-------vvvvv--vv-----");
}
MessageStructuredData structured_data() const override {
@@ -618,25 +614,43 @@
}
private:
- uint8_t raw_packet_[32] = {
- 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias
- 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
- 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd"
- 0x20, // subscriber priority = 0x20
- 0x02, // group order = descending
- 0x01, // forward = true
- 0x03, // Filter type: Absolute Start
- 0x04, // start_group = 4
- 0x01, // start_object = 1
+ uint8_t raw_packet_[31] = {
+ 0x03,
+ 0x00,
+ 0x1c,
+ 0x01, // request_id = 1
+ 0x01,
+ 0x03,
+ 0x66,
+ 0x6f,
+ 0x6f, // track_namespace = "foo"
+ 0x04,
+ 0x61,
+ 0x62,
+ 0x63,
+ 0x64, // track_name = "abcd"
+ 0x20, // subscriber priority = 0x20
+ 0x02, // group order = descending
+ 0x01, // forward = true
+ 0x03, // Filter type: Absolute Start
+ 0x04, // start_group = 4
+ 0x01, // start_object = 1
// No EndGroup or EndObject
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms
- 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar"
+ 0x02, // 2 parameters
+ 0x02,
+ 0x67,
+ 0x10, // delivery_timeout = 10000 ms
+ 0x03,
+ 0x05,
+ 0x03,
+ 0x00,
+ 0x62,
+ 0x61,
+ 0x72, // authorization_tag = "bar"
};
MoqtSubscribe subscribe_ = {
- /*subscribe_id=*/1,
- /*track_alias=*/2,
+ /*request_id=*/1,
FullTrackName("foo", "abcd"),
/*subscriber_priority=*/0x20,
/*group_order=*/MoqtDeliveryOrder::kDescending,
@@ -661,6 +675,10 @@
QUIC_LOG(INFO) << "SUBSCRIBE OK subscribe ID mismatch";
return false;
}
+ if (cast.track_alias != subscribe_ok_.track_alias) {
+ QUIC_LOG(INFO) << "SUBSCRIBE OK track alias mismatch";
+ return false;
+ }
if (cast.expires != subscribe_ok_.expires) {
QUIC_LOG(INFO) << "SUBSCRIBE OK expiration mismatch";
return false;
@@ -680,34 +698,35 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vv--vvvv--v--"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvv--vvvv--v--"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_ok_);
}
void SetInvalidContentExists() {
- raw_packet_[6] = 0x02;
+ raw_packet_[7] = 0x02;
SetWireImage(raw_packet_, sizeof(raw_packet_));
}
void SetInvalidDeliveryOrder() {
- raw_packet_[5] = 0x10;
+ raw_packet_[6] = 0x10;
SetWireImage(raw_packet_, sizeof(raw_packet_));
}
private:
- uint8_t raw_packet_[16] = {
- 0x04, 0x00, 0x0d, 0x01, 0x03, // request_id = 1, expires = 3
- 0x02, 0x01, // group_order = 2, content exists
- 0x0c, 0x14, // largest_location = (12, 20)
- 0x02, // 2 parameters
- 0x02, 0x67, 0x10, // delivery_timeout = 10000
- 0x04, 0x67, 0x10, // max_cache_duration = 10000
+ uint8_t raw_packet_[17] = {
+ 0x04, 0x00, 0x0e, 0x01, 0x02, 0x03, // request_id, alias, expires
+ 0x02, 0x01, // group_order = 2, content exists
+ 0x0c, 0x14, // largest_location = (12, 20)
+ 0x02, // 2 parameters
+ 0x02, 0x67, 0x10, // delivery_timeout = 10000
+ 0x04, 0x67, 0x10, // max_cache_duration = 10000
};
MoqtSubscribeOk subscribe_ok_ = {
/*request_id=*/1,
+ /*track_alias=*/2,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
/*group_order=*/MoqtDeliveryOrder::kDescending,
/*largest_location=*/Location(12, 20),
@@ -736,33 +755,27 @@
QUIC_LOG(INFO) << "SUBSCRIBE ERROR reason phrase mismatch";
return false;
}
- if (cast.track_alias != subscribe_error_.track_alias) {
- QUIC_LOG(INFO) << "SUBSCRIBE ERROR track alias mismatch";
- return false;
- }
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvv---v"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvv---"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_error_);
}
private:
- uint8_t raw_packet_[10] = {
- 0x05, 0x00, 0x07,
+ uint8_t raw_packet_[9] = {
+ 0x05, 0x00, 0x06,
0x02, // request_id = 2
0x05, // error_code = 5
0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar"
- 0x04, // track_alias = 4
};
MoqtSubscribeError subscribe_error_ = {
/*request_id=*/2,
/*error_code=*/RequestErrorCode::kInvalidRange,
/*reason_phrase=*/"bar",
- /*track_alias=*/4,
};
};