Make request_id even/odd for client/server. Implement new MoQT SUBSCRIBE format. PiperOrigin-RevId: 756496625
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index d2f57fc..21588c9 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -193,6 +193,8 @@ return WireUint8(0xff); } +WireUint8 WireBoolean(bool value) { return WireUint8(value ? 0x01 : 0x00); } + uint64_t SignedVarintSerializedForm(int64_t value) { if (value < 0) { return ((-value) << 1) | 0x01; @@ -416,11 +418,6 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribe( const MoqtSubscribe& message) { - MoqtFilterType filter_type = GetFilterType(message); - if (filter_type == MoqtFilterType::kNone) { - QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range"; - return quiche::QuicheBuffer(); - } KeyValuePairList parameters; VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); if (!ValidateVersionSpecificParameters(parameters, @@ -429,33 +426,44 @@ << "Serializing invalid MoQT parameters"; return quiche::QuicheBuffer(); } - switch (filter_type) { + switch (message.filter_type) { + case MoqtFilterType::kNextGroupStart: case MoqtFilterType::kLatestObject: return SerializeControlMessage( - MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id), + MoqtMessageType::kSubscribe, WireVarInt62(message.request_id), WireVarInt62(message.track_alias), WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), - WireKeyValuePairList(parameters)); + WireDeliveryOrder(message.group_order), WireBoolean(message.forward), + WireVarInt62(message.filter_type), WireKeyValuePairList(parameters)); case MoqtFilterType::kAbsoluteStart: + if (!message.start.has_value()) { + return quiche::QuicheBuffer(); + }; return SerializeControlMessage( - MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id), + MoqtMessageType::kSubscribe, WireVarInt62(message.request_id), WireVarInt62(message.track_alias), WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), - WireVarInt62(message.start->group), + WireDeliveryOrder(message.group_order), WireBoolean(message.forward), + WireVarInt62(message.filter_type), WireVarInt62(message.start->group), WireVarInt62(message.start->object), WireKeyValuePairList(parameters)); case MoqtFilterType::kAbsoluteRange: + if (!message.start.has_value() || !message.end_group.has_value()) { + return quiche::QuicheBuffer(); + } + if (*message.end_group < message.start->group) { + QUICHE_BUG(MoqtFramer_invalid_end_group) << "Invalid object range"; + return quiche::QuicheBuffer(); + } return SerializeControlMessage( - MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id), + MoqtMessageType::kSubscribe, WireVarInt62(message.request_id), WireVarInt62(message.track_alias), WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), - WireVarInt62(message.start->group), + WireDeliveryOrder(message.group_order), WireBoolean(message.forward), + WireVarInt62(message.filter_type), WireVarInt62(message.start->group), WireVarInt62(message.start->object), WireVarInt62(*message.end_group), WireKeyValuePairList(parameters)); default:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index e772bc8..16939e3 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -402,35 +402,24 @@ } TEST_F(MoqtFramerSimpleTest, AllSubscribeInputs) { - for (std::optional<Location> start : - {std::optional<Location>(), - std::optional<Location>(std::in_place, 4, 0)}) { - for (std::optional<uint64_t> end_group : - {std::optional<uint64_t>(), std::optional<uint64_t>(7)}) { - MoqtSubscribe subscribe = { - /*subscribe_id=*/3, - /*track_alias=*/4, - /*full_track_name=*/FullTrackName({"foo", "abcd"}), - /*subscriber_priority=*/0x20, - /*group_order=*/std::nullopt, - start, - end_group, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), - }; - quiche::QuicheBuffer buffer; - MoqtFilterType expected_filter_type = GetFilterType(subscribe); - if (expected_filter_type == MoqtFilterType::kNone) { - EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe), - "Invalid object range"); - EXPECT_EQ(buffer.size(), 0); - continue; - } - buffer = framer_.SerializeSubscribe(subscribe); - // Go to the filter type. - const uint8_t* read = BufferAtOffset(buffer, 17); - EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type); - EXPECT_GT(buffer.size(), 0); - } + for (auto filter : + {MoqtFilterType::kNextGroupStart, MoqtFilterType::kLatestObject, + MoqtFilterType::kAbsoluteStart, MoqtFilterType::kAbsoluteRange}) { + MoqtSubscribe subscribe = { + /*subscribe_id=*/3, + /*track_alias=*/4, + /*full_track_name=*/FullTrackName({"foo", "abcd"}), + /*subscriber_priority=*/0x20, + /*group_order=*/std::nullopt, + /*forward=*/true, + /*filter_type=*/filter, + /*start=*/std::make_optional<Location>(4, 1), + /*end_group=*/std::make_optional<uint64_t>(6ULL), + VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), + }; + quiche::QuicheBuffer buffer; + buffer = framer_.SerializeSubscribe(subscribe); + EXPECT_GT(buffer.size(), 0); } } @@ -441,13 +430,15 @@ /*full_track_name=*/FullTrackName({"foo", "abcd"}), /*subscriber_priority=*/0x20, /*group_order=*/std::nullopt, - /*start=*/Location(4, 3), - /*end_group=*/3, + /*forward=*/true, + /*filter_type=*/MoqtFilterType::kAbsoluteRange, + /*start=*/std::make_optional<Location>(4, 3), + /*end_group=*/std::make_optional<uint64_t>(3ULL), VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), }; quiche::QuicheBuffer buffer; - EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe), - "Invalid object range"); + EXPECT_QUICHE_BUG(buffer = framer_.SerializeSubscribe(subscribe), + "Invalid object range"); EXPECT_EQ(buffer.size(), 0); }
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 904954c..7e6f480 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -111,22 +111,6 @@ } } -MoqtFilterType GetFilterType(const MoqtSubscribe& message) { - if (message.start.has_value()) { - if (message.end_group.has_value()) { - if (*message.end_group < message.start->group) { - return MoqtFilterType::kNone; - } - return MoqtFilterType::kAbsoluteRange; - } - return MoqtFilterType::kAbsoluteStart; - } - if (message.end_group.has_value()) { - return MoqtFilterType::kNone; // End group without start is invalid. - } - return MoqtFilterType::kLatestObject; -} - MoqtError ValidateSetupParameters(const KeyValuePairList& parameters, bool webtrans, quic::Perspective perspective) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index ac22147..9ba07f8 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -518,33 +518,25 @@ enum class QUICHE_EXPORT MoqtFilterType : uint64_t { kNone = 0x0, + kNextGroupStart = 0x1, kLatestObject = 0x2, kAbsoluteStart = 0x3, kAbsoluteRange = 0x4, }; struct QUICHE_EXPORT MoqtSubscribe { - uint64_t subscribe_id; + uint64_t request_id; uint64_t track_alias; FullTrackName full_track_name; MoqtPriority subscriber_priority; std::optional<MoqtDeliveryOrder> group_order; - - // The combinations of these that have values indicate the filter type. - // (none): KLatestObject - // start: kAbsoluteStart - // start, end_group: kAbsoluteRange (request whole last group) - // All other combinations are invalid. + bool forward; + MoqtFilterType filter_type; std::optional<Location> start; std::optional<uint64_t> end_group; - // If the mode is kNone, the these are std::nullopt. VersionSpecificParameters parameters; }; -// Deduce the filter type from the combination of group and object IDs. Returns -// kNone if the state of the subscribe is invalid. -MoqtFilterType GetFilterType(const MoqtSubscribe& message); - struct QUICHE_EXPORT MoqtSubscribeOk { uint64_t subscribe_id; // The message uses ms, but expires is in us.
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 5dd2508..e9a093b 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -402,23 +402,30 @@ size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) { MoqtSubscribe subscribe; uint64_t filter, group, object; - uint8_t group_order; + uint8_t group_order, forward; absl::string_view track_name; - if (!reader.ReadVarInt62(&subscribe.subscribe_id) || + if (!reader.ReadVarInt62(&subscribe.request_id) || !reader.ReadVarInt62(&subscribe.track_alias) || !ReadTrackNamespace(reader, subscribe.full_track_name) || !reader.ReadStringPieceVarInt62(&track_name) || !reader.ReadUInt8(&subscribe.subscriber_priority) || - !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) { + !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) || + !reader.ReadVarInt62(&filter)) { return 0; } subscribe.full_track_name.AddElement(track_name); if (!ParseDeliveryOrder(group_order, subscribe.group_order)) { - ParseError("Invalid group order value in SUBSCRIBE message"); + ParseError("Invalid group order value in SUBSCRIBE"); return 0; } - MoqtFilterType filter_type = static_cast<MoqtFilterType>(filter); - switch (filter_type) { + if (forward > 1) { + ParseError("Invalid forward value in SUBSCRIBE"); + return 0; + } + subscribe.forward = (forward == 1); + subscribe.filter_type = static_cast<MoqtFilterType>(filter); + switch (subscribe.filter_type) { + case MoqtFilterType::kNextGroupStart: case MoqtFilterType::kLatestObject: break; case MoqtFilterType::kAbsoluteStart: @@ -427,7 +434,7 @@ return 0; } subscribe.start = Location(group, object); - if (filter_type == MoqtFilterType::kAbsoluteStart) { + if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) { break; } if (!reader.ReadVarInt62(&group)) {
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 7e4ba28..4779c89 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -648,8 +648,8 @@ 0x20, 0x00, 0x0d, 0x02, 0x01, 0x02, // versions = 1, 2 0x03, // 4 params 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" - 0x02, 0x32, // max_subscribe_id = 50 - 0x02, 0x32, // max_subscribe_id = 50 + 0x02, 0x32, // max_request_id = 50 + 0x02, 0x32, // max_request_id = 50 }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); @@ -663,10 +663,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x1a, 0x01, 0x02, + 0x03, 0x00, 0x1b, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x02, // two params 0x1f, 0x03, 0x62, 0x61, 0x72, // 0x1f = "bar" @@ -681,10 +681,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x16, 0x01, 0x02, + 0x03, 0x00, 0x17, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x02, // two params 0x02, 0x67, 0x10, // delivery_timeout = 10000 @@ -701,10 +701,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x16, 0x01, 0x02, + 0x03, 0x00, 0x17, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x02, // two params 0x04, 0x67, 0x10, // max_cache_duration = 10000 @@ -721,10 +721,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x14, 0x01, 0x02, + 0x03, 0x00, 0x15, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x01, // one param 0x01, 0x02, 0x00, 0x00, // authorization_token = DELETE 0; @@ -740,10 +740,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x18, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, - 0x6f, // track_namespace = "foo" + 0x03, 0x00, 0x19, 0x01, 0x02, 0x01, 0x03, 0x66, + 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x01, // one param 0x01, 0x06, 0x01, 0x10, 0x00, 0x62, 0x61, 0x72, // REGISTER 0x01 @@ -759,10 +759,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x14, 0x01, 0x02, + 0x03, 0x00, 0x15, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x01, // one param 0x01, 0x02, 0x02, 0x07, // authorization_token = USE 7; @@ -779,10 +779,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x14, 0x01, 0x02, + 0x03, 0x00, 0x15, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x01, // one param 0x01, 0x02, 0x04, 0x07, // authorization_token type 4 @@ -799,10 +799,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x16, 0x01, 0x02, 0x01, + 0x03, 0x00, 0x17, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x02, // filter_type = kLatestObject 0x01, // one param 0x01, 0x04, 0x03, 0x01, 0x00, 0x00 // authorization_token type 1 @@ -814,6 +814,81 @@ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); } +TEST_F(MoqtMessageSpecificTest, SubscribeInvalidGroupOrder) { + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtControlParser parser(kRawQuic, &stream, visitor_); + char subscribe[] = { + 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, // subscriber priority = 0x20 + 0x03, // group order = invalid + 0x01, // forward = true + 0x03, // Filter type: Absolute Start + 0x04, // start_group = 4 (relative previous) + 0x01, // start_object = 1 (absolute) + // No EndGroup or EndObject + 0x02, // 2 parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" + }; + stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); + parser.ReadAndDispatchMessages(); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeInvalidForward) { + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtControlParser parser(kRawQuic, &stream, visitor_); + char subscribe[] = { + 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, // subscriber priority = 0x20 + 0x02, // group order = descending + 0x02, // forward = invalid + 0x03, // Filter type: Absolute Start + 0x04, // start_group = 4 (relative previous) + 0x01, // start_object = 1 (absolute) + // No EndGroup or EndObject + 0x02, // 2 parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" + }; + stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); + parser.ReadAndDispatchMessages(); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in SUBSCRIBE"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeInvalidFilter) { + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtControlParser parser(kRawQuic, &stream, visitor_); + char subscribe[] = { + 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, // subscriber priority = 0x20 + 0x02, // group order = descending + 0x01, // forward = true + 0x05, // Filter type: Absolute Start + 0x04, // start_group = 4 (relative previous) + 0x01, // start_object = 1 (absolute) + // No EndGroup or EndObject + 0x02, // 2 parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" + }; + stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); + parser.ReadAndDispatchMessages(); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, "Invalid filter type"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationToken) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); @@ -978,13 +1053,13 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x15, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20, group order descending + 0x20, 0x02, 0x01, // priority = 0x20, group order, forward 0x02, // filter_type = kLatestObject 0x01, // 1 parameter - 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -1000,28 +1075,28 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x15, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x08, // priority = 0x20 ??? - 0x01, // filter_type = kLatestGroup + 0x20, 0x08, 0x01, // priority, invalid order, forward + 0x01, // filter_type = kNextGroupStart 0x01, // 1 parameter - 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_THAT(visitor_.parsing_error_, Optional(HasSubstr("group order"))); + EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE"); } TEST_F(MoqtMessageSpecificTest, AbsoluteStart) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x17, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x03, // filter_type = kAbsoluteStart 0x04, // start_group = 4 0x01, // start_object = 1 @@ -1043,10 +1118,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x19, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x04, // filter_type = kAbsoluteRange 0x04, // start_group = 4 0x01, // start_object = 1 @@ -1069,10 +1144,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x18, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x19, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x04, // filter_type = kAbsoluteRange 0x04, // start_group = 4 0x01, // start_object = 1 @@ -1090,10 +1165,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x00, 0x13, 0x01, 0x02, // id and alias + 0x03, 0x00, 0x14, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending + 0x20, 0x02, 0x01, // priority, order, forward 0x04, // filter_type = kAbsoluteRange 0x04, // start_group = 4 0x01, // start_object = 1
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 2d7ab24..5645885 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -71,9 +71,17 @@ return status.ok() && DoesTrackStatusImplyHavingData(*status); } -SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) { - return SubscribeWindow(subscribe.start.value_or(Location(0, 0)), - subscribe.end_group); +std::optional<SubscribeWindow> SubscribeMessageToWindow( + const MoqtSubscribe& subscribe) { + if (!subscribe.forward || + subscribe.filter_type == MoqtFilterType::kLatestObject || + subscribe.filter_type == MoqtFilterType::kNextGroupStart) { + return std::nullopt; + } + if (!subscribe.start.has_value()) { + return std::nullopt; + } + return SubscribeWindow(*subscribe.start, subscribe.end_group); } class DefaultPublisher : public MoqtPublisher { @@ -111,6 +119,11 @@ } }); } + if (parameters_.perspective == Perspective::IS_SERVER) { + next_request_id_ = 1; + } else { + next_incoming_request_id_ = 1; + } } MoqtSession::ControlStream* MoqtSession::GetControlStream() { @@ -327,6 +340,8 @@ message.full_track_name = name; message.subscriber_priority = kDefaultSubscriberPriority; message.group_order = std::nullopt; + message.forward = true; + message.filter_type = MoqtFilterType::kAbsoluteStart; message.start = Location(start_group, start_object); message.end_group = std::nullopt; message.parameters = parameters; @@ -346,6 +361,8 @@ message.full_track_name = name; message.subscriber_priority = kDefaultSubscriberPriority; message.group_order = std::nullopt; + message.forward = true; + message.filter_type = MoqtFilterType::kAbsoluteRange; message.start = Location(start_group, start_object); message.end_group = end_group; message.parameters = parameters; @@ -359,6 +376,8 @@ message.full_track_name = name; message.subscriber_priority = kDefaultSubscriberPriority; message.group_order = std::nullopt; + message.forward = true; + message.filter_type = MoqtFilterType::kLatestObject; message.start = std::nullopt; message.end_group = std::nullopt; message.parameters = parameters; @@ -396,7 +415,8 @@ } MoqtFetch message; message.full_track_name = name; - message.fetch_id = next_request_id_++; + message.fetch_id = next_request_id_; + next_request_id_ += 2; message.start_object = start; message.end_group = end_group; message.end_object = end_object; @@ -439,9 +459,9 @@ MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, VersionSpecificParameters parameters) { - if ((next_request_id_ + 1) >= peer_max_request_id_) { + if ((next_request_id_ + 2) >= peer_max_request_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID " - << (next_request_id_ + 1) + << (next_request_id_ + 2) << " which is greater than the maximum ID " << peer_max_request_id_; return false; @@ -450,7 +470,8 @@ subscribe.full_track_name = name; subscribe.subscriber_priority = priority; subscribe.group_order = delivery_order; - // Must be "Current Object" filter. + subscribe.forward = true; + subscribe.filter_type = MoqtFilterType::kLatestObject; subscribe.start = std::nullopt; subscribe.end_group = std::nullopt; subscribe.parameters = parameters; @@ -458,10 +479,11 @@ return false; } MoqtFetch fetch; - fetch.fetch_id = next_request_id_++; + fetch.fetch_id = next_request_id_; + next_request_id_ += 2; fetch.subscriber_priority = priority; fetch.group_order = delivery_order; - fetch.joining_fetch = {subscribe.subscribe_id, num_previous_groups}; + fetch.joining_fetch = {subscribe.request_id, num_previous_groups}; fetch.parameters = parameters; SendControlMessage(framer_.SerializeFetch(fetch)); QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name; @@ -614,7 +636,8 @@ QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE after GOAWAY"; return false; } - message.subscribe_id = next_request_id_++; + message.request_id = next_request_id_; + next_request_id_ += 2; if (provided_track_alias.has_value()) { message.track_alias = *provided_track_alias; next_remote_track_alias_ = @@ -626,7 +649,7 @@ // Since we do not expose subscribe IDs directly in the API, instead wrap // the session and subscribe ID in a callback. visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this, - message.subscribe_id)); + message.request_id)); } else { QUICHE_DLOG_IF(WARNING, message.parameters.oack_window_size.has_value()) << "Attempting to set object_ack_window on a connection that does not " @@ -639,7 +662,7 @@ auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor); subscribe_by_name_.emplace(message.full_track_name, track.get()); subscribe_by_alias_.emplace(message.track_alias, track.get()); - upstream_by_id_.emplace(message.subscribe_id, std::move(track)); + upstream_by_id_.emplace(message.request_id, std::move(track)); return true; } @@ -776,7 +799,7 @@ } void MoqtSession::GrantMoreRequests(uint64_t num_requests) { - local_max_request_id_ += num_requests; + local_max_request_id_ += (num_requests * 2); MoqtMaxRequestId message; message.max_request_id = local_max_request_id_; SendControlMessage(framer_.SerializeMaxRequestId(message)); @@ -788,13 +811,13 @@ Error(MoqtError::kTooManyRequests, "Received request with too large ID"); return false; } - if (request_id < next_incoming_request_id_) { + if (request_id != next_incoming_request_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Request ID not monotonically increasing"; Error(MoqtError::kInvalidRequestId, "Request ID not monotonically increasing"); return false; } - next_incoming_request_id_ = request_id + 1; + next_incoming_request_id_ = request_id + 2; return true; } @@ -902,14 +925,14 @@ void MoqtSession::ControlStream::OnSubscribeMessage( const MoqtSubscribe& message) { - if (!session_->ValidateRequestId(message.subscribe_id)) { + if (!session_->ValidateRequestId(message.request_id)) { return; } QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for " << message.full_track_name; if (session_->sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY"; - SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kUnauthorized, + SendSubscribeError(message.request_id, SubscribeErrorCode::kUnauthorized, "SUBSCRIBE after GOAWAY", message.track_alias); return; } @@ -925,7 +948,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name << " rejected by the application: " << track_publisher.status(); - SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kDoesNotExist, + SendSubscribeError(message.request_id, SubscribeErrorCode::kDoesNotExist, track_publisher.status().message(), message.track_alias); return; } @@ -945,7 +968,7 @@ subscription->set_delivery_timeout(message.parameters.delivery_timeout); MoqtSession::PublishedSubscription* subscription_ptr = subscription.get(); auto [it, success] = session_->published_subscriptions_.emplace( - message.subscribe_id, std::move(subscription)); + message.request_id, std::move(subscription)); if (!success) { QUICHE_NOTREACHED(); // ValidateRequestId() should have caught this. } @@ -1692,11 +1715,12 @@ MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher, const MoqtSubscribe& subscribe, MoqtPublishingMonitorInterface* monitoring_interface) - : filter_type_(GetFilterType(subscribe)), - subscription_id_(subscribe.subscribe_id), - session_(session), + : session_(session), track_publisher_(track_publisher), + request_id_(subscribe.request_id), track_alias_(subscribe.track_alias), + filter_type_(subscribe.filter_type), + forward_(subscribe.forward), window_(SubscribeMessageToWindow(subscribe)), subscriber_priority_(subscribe.subscriber_priority), subscriber_delivery_order_(subscribe.group_order), @@ -1731,12 +1755,18 @@ void MoqtSession::PublishedSubscription::Update( Location start, std::optional<uint64_t> end_group, MoqtPriority subscriber_priority) { - window_.TruncateStart(start); - if (end_group.has_value()) { - window_.TruncateEnd(*end_group); - } subscriber_priority_ = subscriber_priority; + if (!window_.has_value()) { + window_ = SubscribeWindow(start, end_group); + return; + } + window_->TruncateStart(start); + if (end_group.has_value()) { + window_->TruncateEnd(*end_group); + } // TODO: update priority of all data streams that are currently open. + // TODO: update delivery timeout. + // TODO: update forward and subscribe filter. // TODO: reset streams that are no longer in-window. // TODO: send SUBSCRIBE_DONE if required. @@ -1755,7 +1785,7 @@ webtransport::SendOrder old_send_order = FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first); subscriber_priority_ = priority; - session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, + session_->UpdateQueuedSendOrder(request_id_, old_send_order, FinalizeSendOrder(old_send_order)); }; @@ -1764,21 +1794,26 @@ ControlStream* stream = session_->GetControlStream(); if (PublisherHasData(*track_publisher_)) { largest_id = track_publisher_->GetLargestSequence(); - if (window_.end() < *largest_id) { - stream->SendSubscribeError(subscription_id_, - SubscribeErrorCode::kInvalidRange, - "SUBSCRIBE ends in past group", track_alias_); - session_->published_subscriptions_.erase(subscription_id_); - // No class access below this line! - return; + QUICHE_CHECK(largest_id.has_value()); + if (forward_) { + switch (filter_type_) { + case MoqtFilterType::kLatestObject: + window_ = SubscribeWindow(largest_id->next()); + break; + case MoqtFilterType::kNextGroupStart: + window_ = SubscribeWindow(Location(largest_id->group + 1, 0)); + break; + default: + break; + } } - if (!window_.TruncateStart(largest_id->next())) { - QUICHE_NOTREACHED(); - }; + } else if (filter_type_ == MoqtFilterType::kLatestObject || + filter_type_ == MoqtFilterType::kNextGroupStart) { + // No data yet. All objects will be in-window. + window_ = SubscribeWindow(Location(0, 0)); } - MoqtSubscribeOk subscribe_ok; - subscribe_ok.subscribe_id = subscription_id_; + subscribe_ok.subscribe_id = request_id_; subscribe_ok.group_order = track_publisher_->GetDeliveryOrder(); subscribe_ok.largest_id = largest_id; // TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the @@ -1790,15 +1825,15 @@ void MoqtSession::PublishedSubscription::OnSubscribeRejected( MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias) { session_->GetControlStream()->SendSubscribeError( - subscription_id_, reason.error_code, reason.reason_phrase, + request_id_, reason.error_code, reason.reason_phrase, track_alias.value_or(track_alias_)); - session_->published_subscriptions_.erase(subscription_id_); + session_->published_subscriptions_.erase(request_id_); // No class access below this line! } void MoqtSession::PublishedSubscription::OnNewObjectAvailable( Location sequence) { - if (!window_.InWindow(sequence)) { + if (!InWindow(sequence)) { return; } if (reset_subgroups_.contains( @@ -1841,7 +1876,7 @@ if (stream_id.has_value()) { raw_stream = session_->session_->GetStreamById(*stream_id); } else { - raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence); + raw_stream = session_->OpenOrQueueDataStream(request_id_, sequence); } if (raw_stream == nullptr) { return; @@ -1853,12 +1888,12 @@ } void MoqtSession::PublishedSubscription::OnTrackPublisherGone() { - session_->SubscribeIsDone(subscription_id_, SubscribeDoneCode::kGoingAway, + session_->SubscribeIsDone(request_id_, SubscribeDoneCode::kGoingAway, "Publisher is gone"); } void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) { - if (!window_.InWindow(sequence)) { + if (!InWindow(sequence)) { return; } if (reset_subgroups_.contains( @@ -1884,7 +1919,7 @@ void MoqtSession::PublishedSubscription::OnSubgroupAbandoned( Location sequence, webtransport::StreamErrorCode error_code) { - if (!window_.InWindow(sequence)) { + if (!InWindow(sequence)) { return; } if (reset_subgroups_.contains( @@ -1907,6 +1942,11 @@ } void MoqtSession::PublishedSubscription::OnGroupAbandoned(uint64_t group_id) { + if (!window_.has_value() || window_->end().group < group_id || + window_->start().group > group_id) { + // The group is not in the window, ignore. + return; + } std::vector<webtransport::StreamId> streams = stream_map().GetStreamsForGroup(group_id); for (webtransport::StreamId stream_id : streams) { @@ -1961,10 +2001,10 @@ queued_outgoing_data_streams_.emplace( UpdateSendOrderForSubscriberPriority(send_order, 0), first_object); if (!start_send_order.has_value()) { - session_->UpdateQueuedSendOrder(subscription_id_, std::nullopt, send_order); + session_->UpdateQueuedSendOrder(request_id_, std::nullopt, send_order); } else if (*start_send_order < send_order) { session_->UpdateQueuedSendOrder( - subscription_id_, FinalizeSendOrder(*start_send_order), send_order); + request_id_, FinalizeSendOrder(*start_send_order), send_order); } } @@ -1980,13 +2020,12 @@ // then taking base(). queued_outgoing_data_streams_.erase((++it).base()); if (queued_outgoing_data_streams_.empty()) { - session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, - std::nullopt); + session_->UpdateQueuedSendOrder(request_id_, old_send_order, std::nullopt); } else { webtransport::SendOrder new_send_order = FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first); if (old_send_order != new_send_order) { - session_->UpdateQueuedSendOrder(subscription_id_, old_send_order, + session_->UpdateQueuedSendOrder(request_id_, old_send_order, new_send_order); } } @@ -2017,7 +2056,7 @@ PublishedSubscription& subscription, Location first_object) : session_(session), stream_(stream), - subscription_id_(subscription.subscription_id()), + subscription_id_(subscription.request_id()), next_object_(first_object), session_liveness_(session->liveness_token_) { UpdateSendOrder(subscription);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index eceaa98..bc4e80d 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -113,6 +113,7 @@ // the track, the message will still be sent. However, the visitor will be // ignored. // Subscribe from (start_group, start_object) to the end of the track. + // TODO(martinduke): Allow setting forward = false. bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, @@ -125,6 +126,7 @@ bool SubscribeCurrentObject(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, VersionSpecificParameters parameters) override; + // TODO(martinduke): SubscribeNextGroup // Returns false if the subscription is not found. The session immediately // destroys all subscription state. void Unsubscribe(const FullTrackName& name); @@ -337,7 +339,7 @@ PublishedSubscription& operator=(const PublishedSubscription&) = delete; PublishedSubscription& operator=(PublishedSubscription&&) = delete; - uint64_t subscription_id() const { return subscription_id_; } + uint64_t request_id() const { return request_id_; } MoqtTrackPublisher& publisher() { return *track_publisher_; } uint64_t track_alias() const { return track_alias_; } std::optional<Location> largest_sent() const { return largest_sent_; } @@ -372,8 +374,13 @@ MoqtPriority subscriber_priority); // Checks if the specified sequence is within the window of this // subscription. - bool InWindow(Location sequence) { return window_.InWindow(sequence); } - Location GetWindowStart() const { return window_.start(); } + bool InWindow(Location sequence) { + return forward_ && window_.has_value() && window_->InWindow(sequence); + } + Location GetWindowStart() const { + QUICHE_CHECK(window_.has_value()); + return window_->start(); + } MoqtFilterType filter_type() const { return filter_type_; }; void OnDataStreamCreated(webtransport::StreamId id, @@ -427,12 +434,16 @@ subscriber_priority_); } - MoqtFilterType filter_type_; - uint64_t subscription_id_; MoqtSession* session_; std::shared_ptr<MoqtTrackPublisher> track_publisher_; + uint64_t request_id_; uint64_t track_alias_; - SubscribeWindow window_; + MoqtFilterType filter_type_; + bool forward_; + // If window_ is nullopt, any arriving objects are ignored. This could be + // because forward=0, or because the subscription is waiting for a + // SUBSCRIBE_OK and doesn't know what the window should be yet. + std::optional<SubscribeWindow> window_; MoqtPriority subscriber_priority_; uint64_t streams_opened_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 2c8797c..1487e25 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -60,11 +60,13 @@ MoqtSubscribe DefaultSubscribe() { MoqtSubscribe subscribe = { - /*subscribe_id=*/1, + /*request_id=*/1, /*track_alias=*/2, kDefaultTrackName(), /*subscriber_priority=*/0x80, /*group_order=*/std::nullopt, + /*forward=*/true, + /*filter_type=*/MoqtFilterType::kAbsoluteStart, /*start=*/Location(0, 0), /*end_group=*/std::nullopt, /*parameters=*/VersionSpecificParameters(), @@ -74,7 +76,7 @@ MoqtFetch DefaultFetch() { MoqtFetch fetch = { - /*fetch_id=*/2, + /*fetch_id=*/1, /*subscriber_priority=*/0x80, /*group_order=*/std::nullopt, /*joining_fetch=*/std::nullopt, @@ -155,7 +157,7 @@ return nullptr; } MoqtSubscribeOk expected_ok = { - /*subscribe_id=*/subscribe.subscribe_id, + /*request_id=*/subscribe.request_id, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), /*group_order=*/MoqtDeliveryOrder::kAscending, (*track_status == MoqtTrackStatusCode::kInProgress) @@ -346,7 +348,7 @@ // Add the track. Now Subscribe should succeed. MockTrackPublisher* track = CreateTrackPublisher(); std::make_shared<MockTrackPublisher>(request.full_track_name); - ++request.subscribe_id; + request.request_id += 2; ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); } @@ -504,28 +506,56 @@ ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); } -TEST_F(MoqtSessionTest, SubscribeEntirelyInPast) { +TEST_F(MoqtSessionTest, SubscribeDoNotForward) { std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); MockTrackPublisher* track = CreateTrackPublisher(); - SetLargestId(track, Location(10, 20)); - MoqtSubscribe request = DefaultSubscribe(); - request.end_group = 9; - EXPECT_CALL(*track, AddObjectListener) - .WillOnce([&](MoqtObjectListener* listener) { - listener->OnSubscribeAccepted(); - }); - MoqtSubscribeError expected_error = { - /*subscribe_id=*/request.subscribe_id, - /*error_code=*/SubscribeErrorCode::kInvalidRange, - /*reason_phrase=*/"SUBSCRIBE ends in past group", - /*track_alias=*/request.track_alias, - }; - EXPECT_CALL(mock_stream_, - Writev(SerializedControlMessage(expected_error), _)); - stream_input->OnSubscribeMessage(request); - EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr); + request.forward = false; + request.filter_type = MoqtFilterType::kLatestObject; + EXPECT_CALL(*track, GetTrackStatus) + .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); + MoqtObjectListener* listener = + ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); + // forward=false, so incoming objects are ignored. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .Times(0); + listener->OnNewObjectAvailable(Location(0, 0)); +} + +TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) { + std::unique_ptr<MoqtControlParserVisitor> stream_input = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MockTrackPublisher* track = CreateTrackPublisher(); + MoqtSubscribe request = DefaultSubscribe(); + request.start = Location(1, 0); + EXPECT_CALL(*track, GetTrackStatus) + .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun)); + MoqtObjectListener* listener = + ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); + // Window was not set to (0, 0) by SUBSCRIBE acceptance. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .Times(0); + listener->OnNewObjectAvailable(Location(0, 0)); +} + +TEST_F(MoqtSessionTest, SubscribeNextGroup) { + std::unique_ptr<MoqtControlParserVisitor> stream_input = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MockTrackPublisher* track = CreateTrackPublisher(); + MoqtSubscribe request = DefaultSubscribe(); + request.filter_type = MoqtFilterType::kNextGroupStart; + SetLargestId(track, Location(10, 20)); + MoqtObjectListener* listener = + ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); + // Later objects in group 10 ignored. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .Times(0); + listener->OnNewObjectAvailable(Location(10, 21)); + // Group 11 is sent. + EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream()) + .WillOnce(Return(false)); + listener->OnNewObjectAvailable(Location(11, 0)); } TEST_F(MoqtSessionTest, TwoSubscribesForTrack) { @@ -535,7 +565,7 @@ MoqtSubscribe request = DefaultSubscribe(); ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); - request.subscribe_id = 2; + request.request_id = 3; request.start = Location(12, 0); EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -553,13 +583,13 @@ // Peer unsubscribes. MoqtUnsubscribe unsubscribe = { - /*subscribe_id=*/1, + /*request_id=*/1, }; stream_input->OnUnsubscribeMessage(unsubscribe); EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr); // Subscribe again, succeeds. - request.subscribe_id = 2; + request.request_id = 3; request.start = Location(12, 0); ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); } @@ -567,7 +597,7 @@ TEST_F(MoqtSessionTest, RequestIdTooHigh) { // Peer subscribes to (0, 0) MoqtSubscribe request = DefaultSubscribe(); - request.subscribe_id = kDefaultInitialMaxRequestId + 1; + request.request_id = kDefaultInitialMaxRequestId + 1; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); @@ -577,6 +607,10 @@ stream_input->OnSubscribeMessage(request); } +TEST_F(MoqtSessionTest, RequestIdWrongLsb) { + // TODO(martinduke): Implement this test. +} + TEST_F(MoqtSessionTest, SubscribeIdNotIncreasing) { MoqtSubscribe request = DefaultSubscribe(); std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -650,7 +684,7 @@ VersionSpecificParameters()); MoqtSubscribeOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), }; EXPECT_CALL(remote_track_visitor, OnReply(_, _, _)) @@ -709,7 +743,9 @@ session_.GrantMoreRequests(1); // Peer subscribes to (0, 0) MoqtSubscribe request = DefaultSubscribe(); - request.subscribe_id = kDefaultInitialMaxRequestId; + MoqtSessionPeer::set_next_incoming_request_id( + &session_, kDefaultInitialMaxRequestId + 1); + request.request_id = kDefaultInitialMaxRequestId + 1; MockTrackPublisher* track = CreateTrackPublisher(); ReceiveSubscribeSynchronousOk(track, request, stream_input.get()); } @@ -726,7 +762,7 @@ VersionSpecificParameters()); MoqtSubscribeError error = { - /*subscribe_id=*/0, + /*request_id=*/0, /*error_code=*/SubscribeErrorCode::kInvalidRange, /*reason_phrase=*/"deadbeef", /*track_alias=*/2, @@ -1011,7 +1047,7 @@ // SUBSCRIBE_OK arrives MoqtSubscribeOk ok = { - /*subscribe_id=*/1, + /*request_id=*/1, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, @@ -1056,7 +1092,7 @@ // SUBSCRIBE_ERROR arrives MoqtSubscribeError subscribe_error = { - /*subscribe_id=*/1, + /*request_id=*/1, /*error_code=*/SubscribeErrorCode::kRetryTrackAlias, /*reason_phrase=*/"foo", /*track_alias =*/3, @@ -1078,7 +1114,7 @@ // SUBSCRIBE_ERROR arrives MoqtSubscribeError subscribe_error = { - /*subscribe_id=*/1, + /*request_id=*/1, /*error_code=*/SubscribeErrorCode::kRetryTrackAlias, /*reason_phrase=*/"foo", /*track_alias =*/3, @@ -1098,7 +1134,7 @@ // SUBSCRIBE_ERROR arrives MoqtSubscribeError subscribe_error = { - /*subscribe_id=*/1, + /*request_id=*/1, /*error_code=*/SubscribeErrorCode::kRetryTrackAlias, /*reason_phrase=*/"foo", /*track_alias =*/2, @@ -1680,7 +1716,7 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); MoqtUnsubscribe unsubscribe = { - /*subscribe_id=*/0, + /*request_id=*/0, }; stream_input->OnUnsubscribeMessage(unsubscribe); EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 0), nullptr); @@ -2225,12 +2261,12 @@ TEST_F(MoqtSessionTest, InvalidFetch) { // Update the state so that it expects ID > 0 next time. - MoqtSessionPeer::ValidateRequestId(&session_, 0); + MoqtSessionPeer::ValidateRequestId(&session_, 1); webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &control_stream); MoqtFetch fetch = DefaultFetch(); - fetch.fetch_id = 0; // Too low. + fetch.fetch_id = 1; // Too low. EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId), "Request ID not monotonically increasing")) @@ -2286,6 +2322,7 @@ TEST_F(MoqtSessionTest, IncomingJoiningFetch) { MoqtSubscribe subscribe = DefaultSubscribe(); // Give it the latest object filter. + subscribe.filter_type = MoqtFilterType::kLatestObject; subscribe.start = std::nullopt; subscribe.end_group = std::nullopt; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -2295,7 +2332,7 @@ ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get()); MoqtObjectListener* subscription = - MoqtSessionPeer::GetSubscription(&session_, subscribe.subscribe_id); + MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id); ASSERT_NE(subscription, nullptr); EXPECT_TRUE( MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11))); @@ -2304,6 +2341,7 @@ // Joining FETCH arrives. The resulting Fetch should begin at (2, 0). MoqtFetch fetch = DefaultFetch(); + fetch.fetch_id = 3; fetch.joining_fetch = {1, 2}; EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _)) .WillOnce(Return(std::make_unique<MockFetchTask>())); @@ -2316,7 +2354,7 @@ MoqtFetch fetch = DefaultFetch(); fetch.joining_fetch = {1, 2}; MoqtFetchError expected_error = { - /*subscribe_id=*/2, + /*request_id=*/1, /*error_code=*/SubscribeErrorCode::kDoesNotExist, /*reason_phrase=*/"Joining Fetch for non-existent subscribe", }; @@ -2334,6 +2372,7 @@ ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get()); MoqtFetch fetch = DefaultFetch(); + fetch.fetch_id = 3; fetch.joining_fetch = {1, 2}; EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -2349,16 +2388,19 @@ EXPECT_CALL(mock_session_, GetStreamById(_)) .WillRepeatedly(Return(&mock_stream_)); MoqtSubscribe expected_subscribe = { - /*subscribe_id=*/0, + /*request_id=*/0, /*track_alias=*/0, /*full_track_name=*/FullTrackName("foo", "bar"), /*subscriber_priority=*/0x80, /*group_order=*/MoqtDeliveryOrder::kAscending, + /*forward=*/true, + /*filter_type=*/MoqtFilterType::kLatestObject, /*start=*/std::nullopt, /*end_group=*/std::nullopt, + VersionSpecificParameters(), }; MoqtFetch expected_fetch = { - /*fetch_id=*/1, + /*fetch_id=*/2, /*subscriber_priority=*/0x80, /*group_order=*/MoqtDeliveryOrder::kAscending, /*joining_fetch=*/JoiningFetch(0, 1), @@ -2391,12 +2433,12 @@ MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0), MoqtDeliveryOrder::kAscending, Location(2, 0), VersionSpecificParameters())); - stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending, + stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending, Location(2, 0), VersionSpecificParameters())); // Packet arrives on FETCH stream. MoqtObject object = { - /*fetch_id=*/1, + /*fetch_id=*/2, /*group_id, object_id=*/0, 0, /*publisher_priority=*/128, @@ -2479,7 +2521,7 @@ Location(0, 0), 4, std::nullopt, 128, std::nullopt, VersionSpecificParameters()); MoqtFetchOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), VersionSpecificParameters(), @@ -2508,7 +2550,7 @@ Location(0, 0), 4, std::nullopt, 128, std::nullopt, VersionSpecificParameters()); MoqtFetchError error = { - /*subscribe_id=*/0, + /*request_id=*/0, /*error_code=*/SubscribeErrorCode::kUnauthorized, /*reason_phrase=*/"No username provided", }; @@ -2546,7 +2588,7 @@ std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; MoqtObject object = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_id, object_id=*/0, 0, /*publisher_priority=*/128, @@ -2578,7 +2620,7 @@ // FETCH_OK arrives, objects are delivered. MoqtFetchOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), VersionSpecificParameters(), @@ -2616,7 +2658,7 @@ std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; MoqtObject object = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_id, object_id=*/0, 0, /*publisher_priority=*/128, @@ -2648,7 +2690,7 @@ // FETCH_OK arrives, objects are available. MoqtFetchOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), VersionSpecificParameters(), @@ -2707,7 +2749,7 @@ bool object_ready = false; task->SetObjectAvailableCallback([&]() { object_ready = true; }); MoqtObject object = { - /*subscribe_id=*/0, + /*request_id=*/0, /*group_id, object_id=*/0, 0, /*publisher_priority=*/128, @@ -3035,6 +3077,7 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _)); MoqtFetch fetch = DefaultFetch(); + fetch.fetch_id = 3; stream_input->OnFetchMessage(fetch); EXPECT_CALL( mock_stream_, @@ -3121,7 +3164,7 @@ &remote_track_visitor, VersionSpecificParameters())); MoqtSubscribeOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, @@ -3179,7 +3222,7 @@ &remote_track_visitor, VersionSpecificParameters())); MoqtSubscribeOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, @@ -3234,7 +3277,7 @@ &remote_track_visitor, VersionSpecificParameters())); MoqtSubscribeOk ok = { - /*subscribe_id=*/0, + /*request_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, @@ -3297,7 +3340,7 @@ MoqtObjectStatus::kNormal); // Update the end to fall at the last delivered object. MoqtSubscribeUpdate update = { - /*subscribe_id=*/0, + /*request_id=*/0, /*start_group=*/5, /*start_object=*/0, /*end_group=*/7,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index a829930..0783141 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -116,7 +116,7 @@ virtual void OnSubscribeDone(FullTrackName full_track_name) = 0; }; SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor) - : RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id, + : RemoteTrack(subscribe.full_track_name, subscribe.request_id, SubscribeWindow(subscribe.start.value_or(Location()), subscribe.end_group)), track_alias_(subscribe.track_alias),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 9ab9078..7bca1db 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -54,6 +54,8 @@ /*full_track_name=*/FullTrackName("foo", "bar"), /*subscriber_priority=*/128, /*group_order=*/std::nullopt, + /*forward=*/true, + /*filter_type=*/MoqtFilterType::kAbsoluteStart, /*start=*/Location(2, 0), std::nullopt, VersionSpecificParameters(), @@ -78,7 +80,7 @@ TEST_F(SubscribeRemoteTrackTest, AllowError) { EXPECT_TRUE(track_.ErrorIsAllowed()); - EXPECT_EQ(track_.GetSubscribe().subscribe_id, subscribe_.subscribe_id); + EXPECT_EQ(track_.GetSubscribe().request_id, subscribe_.request_id); track_.OnObjectOrOk(); EXPECT_FALSE(track_.ErrorIsAllowed()); }
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 9fa1bf3..e3ddb7b 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -91,7 +91,7 @@ track.get()); session->subscribe_by_name_.try_emplace(subscribe.full_track_name, track.get()); - session->upstream_by_id_.try_emplace(subscribe.subscribe_id, + session->upstream_by_id_.try_emplace(subscribe.request_id, std::move(track)); } @@ -102,7 +102,9 @@ MoqtSubscribe subscribe; subscribe.full_track_name = publisher->GetTrackName(); subscribe.track_alias = track_alias; - subscribe.subscribe_id = subscribe_id; + subscribe.request_id = subscribe_id; + subscribe.forward = true; + subscribe.filter_type = MoqtFilterType::kAbsoluteStart; subscribe.start = Location(start_group, start_object); subscribe.subscriber_priority = 0x80; session->published_subscriptions_.emplace( @@ -147,6 +149,10 @@ session->next_request_id_ = id; } + static void set_next_incoming_request_id(MoqtSession* session, uint64_t id) { + session->next_incoming_request_id_ = id; + } + static void set_peer_max_request_id(MoqtSession* session, uint64_t id) { session->peer_max_request_id_ = id; }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index b0709ef..31c2521 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -465,7 +465,7 @@ bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtSubscribe>(values); - if (cast.subscribe_id != subscribe_.subscribe_id) { + if (cast.request_id != subscribe_.request_id) { QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch"; return false; } @@ -485,6 +485,14 @@ QUIC_LOG(INFO) << "SUBSCRIBE group order mismatch"; return false; } + if (cast.forward != subscribe_.forward) { + QUIC_LOG(INFO) << "SUBSCRIBE forward mismatch"; + return false; + } + if (cast.filter_type != subscribe_.filter_type) { + QUIC_LOG(INFO) << "SUBSCRIBE filter type mismatch"; + return false; + } if (cast.start != subscribe_.start) { QUIC_LOG(INFO) << "SUBSCRIBE start mismatch"; return false; @@ -501,7 +509,7 @@ } void ExpandVarints() override { - ExpandVarintsImpl("vvvv---v------vvvvv--vv-----"); + ExpandVarintsImpl("vvvv---v-------vvvvv--vv-----"); } MessageStructuredData structured_data() const override { @@ -509,15 +517,16 @@ } private: - uint8_t raw_packet_[31] = { - 0x03, 0x00, 0x1c, 0x01, 0x02, // id and alias + uint8_t raw_packet_[32] = { + 0x03, 0x00, 0x1d, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, // subscriber priority = 0x20 0x02, // group order = descending + 0x01, // forward = true 0x03, // Filter type: Absolute Start - 0x04, // start_group = 4 (relative previous) - 0x01, // start_object = 1 (absolute) + 0x04, // start_group = 4 + 0x01, // start_object = 1 // No EndGroup or EndObject 0x02, // 2 parameters 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms @@ -530,6 +539,8 @@ /*full_track_name=*/FullTrackName({"foo", "abcd"}), /*subscriber_priority=*/0x20, /*group_order=*/MoqtDeliveryOrder::kDescending, + /*forward=*/true, + /*filter_type=*/MoqtFilterType::kAbsoluteStart, /*start=*/Location(4, 1), /*end_group=*/std::nullopt, VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),