Parameterize remaining MOQT message types. Also FETCH Group Order semantics have changed. The FETCH message dictates GroupOrder and this MUST be followed. Part of draft-16 update. PiperOrigin-RevId: 869355391
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 158c51c..bded8ff 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -353,30 +353,6 @@ return list; } -KeyValuePairList VersionSpecificParameters::ToKeyValuePairList() const { - KeyValuePairList out; - if (delivery_timeout != quic::QuicTimeDelta::Infinite()) { - out.insert( - static_cast<uint64_t>(VersionSpecificParameter::kDeliveryTimeout), - static_cast<uint64_t>(delivery_timeout.ToMilliseconds())); - } - for (const AuthToken& token : authorization_tokens) { - out.insert( - static_cast<uint64_t>(VersionSpecificParameter::kAuthorizationToken), - SerializeAuthToken(token).AsStringView()); - } - if (max_cache_duration != quic::QuicTimeDelta::Infinite()) { - out.insert( - static_cast<uint64_t>(VersionSpecificParameter::kMaxCacheDuration), - static_cast<uint64_t>(max_cache_duration.ToMilliseconds())); - } - if (oack_window_size.has_value()) { - out.insert(static_cast<uint64_t>(VersionSpecificParameter::kOackWindowSize), - static_cast<uint64_t>(oack_window_size->ToMicroseconds())); - } - return out; -} - quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( const MoqtObject& message, MoqtDataStreamType message_type, std::optional<uint64_t> previous_object_in_stream) { @@ -649,18 +625,11 @@ return quiche::QuicheBuffer(); } } - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kFetch, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; if (std::holds_alternative<StandaloneFetch>(message.fetch)) { const StandaloneFetch& standalone_fetch = std::get<StandaloneFetch>(message.fetch); return SerializeControlMessage( MoqtMessageType::kFetch, WireVarInt62(message.request_id), - WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(FetchType::kStandalone), WireFullTrackName(standalone_fetch.full_track_name), WireVarInt62(standalone_fetch.start_location.group), @@ -669,7 +638,7 @@ WireVarInt62(standalone_fetch.end_location.object == kMaxObjectId ? 0 : standalone_fetch.end_location.object + 1), - WireKeyValuePairList(parameters)); + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } uint64_t request_id, joining_start; if (std::holds_alternative<JoiningFetchRelative>(message.fetch)) { @@ -685,26 +654,21 @@ } return SerializeControlMessage( MoqtMessageType::kFetch, WireVarInt62(message.request_id), - WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(message.fetch.index() + 1), WireVarInt62(request_id), - WireVarInt62(joining_start), WireKeyValuePairList(parameters)); + WireVarInt62(joining_start), + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kFetchOk, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; return SerializeControlMessage( MoqtMessageType::kFetchOk, WireVarInt62(message.request_id), - WireDeliveryOrder(message.group_order), WireBoolean(message.end_of_track), + WireBoolean(message.end_of_track), WireVarInt62(message.end_location.group), WireVarInt62(message.end_location.object == kMaxObjectId ? 0 : (message.end_location.object + 1)), - WireKeyValuePairList(parameters)); + WireKeyValuePairList(message.parameters.ToKeyValuePairList()), + WireKeyValuePairList(message.extensions, false)); } quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel( @@ -720,73 +684,19 @@ } quiche::QuicheBuffer MoqtFramer::SerializePublish(const MoqtPublish& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kPublish, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; - std::optional<uint64_t> group, object; - if (message.largest_location.has_value()) { - group = message.largest_location->group; - object = message.largest_location->object; - } return SerializeControlMessage( MoqtMessageType::kPublish, WireVarInt62(message.request_id), WireFullTrackName(message.full_track_name), - WireVarInt62(message.track_alias), WireDeliveryOrder(message.group_order), - WireBoolean(message.largest_location.has_value()), - WireOptional<WireVarInt62>(group), WireOptional<WireVarInt62>(object), - WireBoolean(message.forward), WireKeyValuePairList(parameters)); + WireVarInt62(message.track_alias), + WireKeyValuePairList(message.parameters.ToKeyValuePairList()), + WireKeyValuePairList(message.extensions, false)); } quiche::QuicheBuffer MoqtFramer::SerializePublishOk( const MoqtPublishOk& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kPublishOk, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; - std::optional<uint64_t> start_group, start_object, end_group; - switch (message.filter_type) { - case MoqtFilterType::kNextGroupStart: - case MoqtFilterType::kLargestObject: - break; - case MoqtFilterType::kAbsoluteStart: - case MoqtFilterType::kAbsoluteRange: - if (!message.start.has_value()) { - QUICHE_BUG(QUICHE_BUG_invalid_filter_type) - << "Serializing invalid MoQT filter type"; - return quiche::QuicheBuffer(); - } - start_group = message.start->group; - start_object = message.start->object; - if (message.filter_type == MoqtFilterType::kAbsoluteStart) { - break; - } - if (!message.end_group.has_value()) { - QUICHE_BUG(QUICHE_BUG_invalid_filter_type) - << "Serializing invalid MoQT filter type"; - return quiche::QuicheBuffer(); - } - end_group = message.end_group; - if (*end_group < *start_group) { - QUICHE_BUG(QUICHE_BUG_invalid_filter_type) - << "End group is less than start group"; - return quiche::QuicheBuffer(); - } - break; - default: - QUICHE_BUG(QUICHE_BUG_invalid_filter_type) - << "Serializing invalid MoQT filter type"; - return quiche::QuicheBuffer(); - } return SerializeControlMessage( MoqtMessageType::kPublishOk, WireVarInt62(message.request_id), - WireBoolean(message.forward), WireUint8(message.subscriber_priority), - WireDeliveryOrder(message.group_order), WireVarInt62(message.filter_type), - WireOptional<WireVarInt62>(start_group), - WireOptional<WireVarInt62>(start_object), - WireOptional<WireVarInt62>(end_group), WireKeyValuePairList(parameters)); + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializeObjectAck( @@ -812,18 +722,6 @@ return true; } -bool MoqtFramer::FillAndValidateVersionSpecificParameters( - MoqtMessageType message_type, const VersionSpecificParameters& parameters, - KeyValuePairList& out) { - if (!VersionSpecificParametersAllowedByMessage(parameters, message_type)) { - QUICHE_BUG(QUICHE_BUG_invalid_parameters) - << "Invalid parameters for " << MoqtMessageTypeToString(message_type); - return false; - } - out = parameters.ToKeyValuePairList(); - return true; -} - // static bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object, bool is_datagram) {
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index b9b5a8e..62ae609 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -85,9 +85,6 @@ bool FillAndValidateSetupParameters(MoqtMessageType message_type, const SetupParameters& parameters, KeyValuePairList& out); - bool FillAndValidateVersionSpecificParameters( - MoqtMessageType message_type, const VersionSpecificParameters& parameters, - KeyValuePairList& out); // Returns true if the metadata is internally consistent. static bool ValidateObjectMetadata(const MoqtObject& object, bool is_datagram);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 5a4f4be..f9aa333 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -379,70 +379,15 @@ } } -TEST_F(MoqtFramerSimpleTest, PublishOkEndBeforeStart) { - MoqtPublishOk publish_ok = { - /*request_id=*/1, - /*forward=*/true, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*filter_type=*/MoqtFilterType::kAbsoluteRange, - /*start=*/Location{1, 2}, - /*end_group=*/0, - /*parameters=*/VersionSpecificParameters(), - }; - quiche::QuicheBuffer buffer; - EXPECT_QUICHE_BUG(buffer = framer_.SerializePublishOk(publish_ok), - "End group is less than start group"); - EXPECT_EQ(buffer.size(), 0); -} - -TEST_F(MoqtFramerSimpleTest, PublishOkMissingEndGroup) { - MoqtPublishOk publish_ok = { - /*request_id=*/1, - /*forward=*/true, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*filter_type=*/MoqtFilterType::kAbsoluteRange, - /*start=*/Location{1, 2}, - /*end_group=*/std::nullopt, - /*parameters=*/VersionSpecificParameters(), - }; - quiche::QuicheBuffer buffer; - EXPECT_QUICHE_BUG(buffer = framer_.SerializePublishOk(publish_ok), - "Serializing invalid MoQT filter type"); - EXPECT_EQ(buffer.size(), 0); -} - -TEST_F(MoqtFramerSimpleTest, PublishOkMissingStart) { - MoqtPublishOk publish_ok = { - /*request_id=*/1, - /*forward=*/true, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*filter_type=*/MoqtFilterType::kAbsoluteStart, - /*start=*/std::nullopt, - /*end_group=*/std::nullopt, - /*parameters=*/VersionSpecificParameters(), - }; - quiche::QuicheBuffer buffer; - EXPECT_QUICHE_BUG(buffer = framer_.SerializePublishOk(publish_ok), - "Serializing invalid MoQT filter type"); - EXPECT_EQ(buffer.size(), 0); -} - TEST_F(MoqtFramerSimpleTest, FetchEndBeforeStart) { MoqtFetch fetch = { /*request_id=*/1, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*fetch=*/ StandaloneFetch{ FullTrackName("foo", "bar"), /*start_location=*/Location{1, 2}, /*end_location=*/Location{1, 1}, }, - /*parameters=*/ - VersionSpecificParameters(AuthTokenType::kOutOfBand, "baz"), + MessageParameters(), }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch), @@ -458,10 +403,10 @@ TEST_F(MoqtFramerSimpleTest, FetchOkWholeGroup) { MoqtFetchOk fetch_ok = { /*request_id=*/1, - MoqtDeliveryOrder::kAscending, /*end_of_track=*/false, /*end_location=*/Location{4, kMaxObjectId}, - VersionSpecificParameters(), + MessageParameters(), + TrackExtensions(), }; quiche::QuicheBuffer buffer = framer_.SerializeFetchOk(fetch_ok); // Check that object ID is zero.
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index d14604b..526d54b 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -480,8 +480,7 @@ EXPECT_TRUE(client_->session()->Fetch( full_track_name, [&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); }, - Location{0, 0}, 99, std::nullopt, 128, std::nullopt, - VersionSpecificParameters())); + Location{0, 0}, 99, std::nullopt, MessageParameters())); // Run until we get FETCH_OK. bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return fetch != nullptr; });
diff --git a/quiche/quic/moqt/moqt_key_value_pair.h b/quiche/quic/moqt/moqt_key_value_pair.h index 9757151..09dac22 100644 --- a/quiche/quic/moqt/moqt_key_value_pair.h +++ b/quiche/quic/moqt/moqt_key_value_pair.h
@@ -337,43 +337,6 @@ std::optional<uint64_t> max_value) const; }; -// Version specific parameters. -// TODO(martinduke): Replace with MessageParameters and delete when all -// messages are migrated. -enum class QUICHE_EXPORT VersionSpecificParameter : uint64_t { - kDeliveryTimeout = 0x2, - kAuthorizationToken = 0x3, - kMaxCacheDuration = 0x4, - - // QUICHE-specific extensions. - kOackWindowSize = 0xbbf1438, -}; -struct VersionSpecificParameters { - VersionSpecificParameters() = default; - // Likely parameter combinations. - VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout, - quic::QuicTimeDelta max_cache_duration) - : delivery_timeout(delivery_timeout), - max_cache_duration(max_cache_duration) {} - VersionSpecificParameters(AuthTokenType token_type, absl::string_view token) { - authorization_tokens.emplace_back(token_type, token); - } - VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout, - AuthTokenType token_type, absl::string_view token) - : delivery_timeout(delivery_timeout) { - authorization_tokens.emplace_back(token_type, token); - } - - std::vector<AuthToken> authorization_tokens; - quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite(); - quic::QuicTimeDelta max_cache_duration = quic::QuicTimeDelta::Infinite(); - std::optional<quic::QuicTimeDelta> oack_window_size; - - bool operator==(const VersionSpecificParameters& other) const = default; - KeyValuePairList ToKeyValuePairList() const; - MoqtError FromKeyValuePairList(const KeyValuePairList& list); -}; - // TODO(martinduke): Extension Headers (MOQT draft-16 Sec 11) } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index ecdf54c..3e75e58 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -62,6 +62,9 @@ return MoqtError::kNoError; } +// Parameter types are not enforced by message in draft-16, but apparently this +// is coming back later. +#if 0 const std::array<MoqtMessageType, 9> kAllowsAuthorization = { MoqtMessageType::kClientSetup, MoqtMessageType::kServerSetup, @@ -77,11 +80,8 @@ MoqtMessageType::kPublish, MoqtMessageType::kPublishOk, MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, MoqtMessageType::kRequestUpdate}; -const std::array<MoqtMessageType, 4> kAllowsMaxCacheDuration = { - MoqtMessageType::kSubscribeOk, MoqtMessageType::kRequestOk, - MoqtMessageType::kFetchOk, MoqtMessageType::kPublish}; -bool VersionSpecificParametersAllowedByMessage( - const VersionSpecificParameters& parameters, MoqtMessageType message_type) { +bool MessageParametersAllowedByMessage( + const MessageParameters& parameters, MoqtMessageType message_type) { if (!parameters.authorization_tokens.empty() && !absl::c_linear_search(kAllowsAuthorization, message_type)) { return false; @@ -90,12 +90,9 @@ !absl::c_linear_search(kAllowsDeliveryTimeout, message_type)) { return false; } - if (parameters.max_cache_duration != quic::QuicTimeDelta::Infinite() && - !absl::c_linear_search(kAllowsMaxCacheDuration, message_type)) { - return false; - } return true; } +#endif std::string MoqtMessageTypeToString(const MoqtMessageType message_type) { switch (message_type) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 1b4e9cf..a05b7d7 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -493,19 +493,17 @@ struct QUICHE_EXPORT MoqtFetch { uint64_t request_id; - MoqtPriority subscriber_priority; - std::optional<MoqtDeliveryOrder> group_order; std::variant<StandaloneFetch, JoiningFetchRelative, JoiningFetchAbsolute> fetch; - VersionSpecificParameters parameters; + MessageParameters parameters; }; struct QUICHE_EXPORT MoqtFetchOk { uint64_t request_id; - MoqtDeliveryOrder group_order; bool end_of_track; Location end_location; - VersionSpecificParameters parameters; + MessageParameters parameters; + TrackExtensions extensions; }; struct QUICHE_EXPORT MoqtFetchCancel { @@ -520,21 +518,13 @@ uint64_t request_id; FullTrackName full_track_name; uint64_t track_alias; - MoqtDeliveryOrder group_order; - std::optional<Location> largest_location; - bool forward; - VersionSpecificParameters parameters; + MessageParameters parameters; + TrackExtensions extensions; }; struct QUICHE_EXPORT MoqtPublishOk { uint64_t request_id; - bool forward; - MoqtPriority subscriber_priority; - MoqtDeliveryOrder group_order; - MoqtFilterType filter_type; - std::optional<Location> start; - std::optional<uint64_t> end_group; - VersionSpecificParameters parameters; + MessageParameters parameters; }; // All of the four values in this message are encoded as varints. @@ -552,9 +542,6 @@ MoqtError SetupParametersAllowedByMessage(const SetupParameters& parameters, MoqtMessageType message_type, bool webtrans); -// Returns false if the parameters cannot be in |message type|. -bool VersionSpecificParametersAllowedByMessage( - const VersionSpecificParameters& parameters, MoqtMessageType message_type); std::string MoqtMessageTypeToString(MoqtMessageType message_type); std::string MoqtDataStreamTypeToString(MoqtDataStreamType type);
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc index 3302be2..d03954b 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -13,8 +13,8 @@ #include "absl/algorithm/container.h" #include "absl/status/status.h" -#include "absl/status/statusor.h" #include "quiche/quic/moqt/moqt_fetch_task.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_priority.h" @@ -137,7 +137,7 @@ } std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::StandaloneFetch( - Location start, Location end, std::optional<MoqtDeliveryOrder> order) { + Location start, Location end, MoqtDeliveryOrder order) { if (queue_.empty()) { return std::make_unique<MoqtFailedFetch>( absl::NotFoundError("No objects available on the track")); @@ -168,7 +168,7 @@ } std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::RelativeFetch( - uint64_t group_diff, std::optional<MoqtDeliveryOrder> order) { + uint64_t group_diff, MoqtDeliveryOrder order) { if (queue_.empty()) { return std::make_unique<MoqtFailedFetch>( absl::NotFoundError("No objects available on the track")); @@ -189,7 +189,7 @@ } std::unique_ptr<MoqtFetchTask> MoqtOutgoingQueue::AbsoluteFetch( - uint64_t group, std::optional<MoqtDeliveryOrder> order) { + uint64_t group, MoqtDeliveryOrder order) { if (queue_.empty()) { return std::make_unique<MoqtFailedFetch>( absl::NotFoundError("No objects available on the track"));
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index 590779f..7dbc7ee 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -73,12 +73,11 @@ const TrackExtensions& extensions() const override { return extensions_; } std::unique_ptr<MoqtFetchTask> StandaloneFetch( - Location start, Location end, - std::optional<MoqtDeliveryOrder> order) override; + Location start, Location end, MoqtDeliveryOrder order) override; std::unique_ptr<MoqtFetchTask> RelativeFetch( - uint64_t group_diff, std::optional<MoqtDeliveryOrder> order) override; + uint64_t group_diff, MoqtDeliveryOrder order) override; std::unique_ptr<MoqtFetchTask> AbsoluteFetch( - uint64_t group, std::optional<MoqtDeliveryOrder> order) override; + uint64_t group, MoqtDeliveryOrder order) override; bool HasSubscribers() const { return !listeners_.empty(); } @@ -135,10 +134,11 @@ return; } MoqtFetchOk ok; - ok.group_order = MoqtDeliveryOrder::kAscending; ok.end_location = *(objects_.crbegin()); if (objects_.size() > 1 && *(objects_.cbegin()) > ok.end_location) { - ok.group_order = MoqtDeliveryOrder::kDescending; + ok.extensions = TrackExtensions( + std::nullopt, std::nullopt, std::nullopt, + MoqtDeliveryOrder::kDescending, std::nullopt, std::nullopt); ok.end_location = *(objects_.cbegin()); } ok.end_of_track =
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 21607f6..3e5a446 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -39,23 +39,6 @@ namespace { -bool ParseDeliveryOrder(uint8_t raw_value, - std::optional<MoqtDeliveryOrder>& output) { - switch (raw_value) { - case 0x00: - output = std::nullopt; - return true; - case 0x01: - output = MoqtDeliveryOrder::kAscending; - return true; - case 0x02: - output = MoqtDeliveryOrder::kDescending; - return true; - default: - return false; - } -} - uint64_t SignedVarintUnserializedForm(uint64_t value) { if (value & 0x01) { return -(value >> 1); @@ -433,49 +416,6 @@ return error; } -MoqtError VersionSpecificParameters::FromKeyValuePairList( - const KeyValuePairList& list) { - MoqtError error = MoqtError::kNoError; - if (list.count(static_cast<uint64_t>( - VersionSpecificParameter::kDeliveryTimeout)) > 1 || - list.count(static_cast<uint64_t>( - VersionSpecificParameter::kMaxCacheDuration)) > 1) { - return MoqtError::kProtocolViolation; - } - list.ForEach([&](uint64_t key, - std::variant<uint64_t, absl::string_view> value) { - VersionSpecificParameter parameter = - static_cast<VersionSpecificParameter>(key); - switch (parameter) { - case VersionSpecificParameter::kDeliveryTimeout: - delivery_timeout = - quic::QuicTimeDelta::TryFromMilliseconds(std::get<uint64_t>(value)) - .value_or(quic::QuicTimeDelta::Infinite()); - break; - case VersionSpecificParameter::kMaxCacheDuration: - max_cache_duration = - quic::QuicTimeDelta::TryFromMilliseconds(std::get<uint64_t>(value)) - .value_or(quic::QuicTimeDelta::Infinite()); - break; - case VersionSpecificParameter::kOackWindowSize: - oack_window_size = - quic::QuicTimeDelta::FromMicroseconds(std::get<uint64_t>(value)); - break; - case VersionSpecificParameter::kAuthorizationToken: - error = ParseAuthTokenParameter(std::get<absl::string_view>(value), - authorization_tokens); - if (error != MoqtError::kNoError) { - return false; - } - break; - default: - break; - } - return true; - }); - return error; -} - bool MoqtMessageTypeParser::ReadUntilMessageTypeKnown() { if (message_type_.has_value()) { return true; @@ -903,15 +843,8 @@ size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) { MoqtFetch fetch; - uint8_t group_order; uint64_t type; - if (!reader.ReadVarInt62(&fetch.request_id) || - !reader.ReadUInt8(&fetch.subscriber_priority) || - !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) { - return 0; - } - if (!ParseDeliveryOrder(group_order, fetch.group_order)) { - ParseError("Invalid group order value in FETCH message"); + if (!reader.ReadVarInt62(&fetch.request_id) || !reader.ReadVarInt62(&type)) { return 0; } switch (static_cast<FetchType>(type)) { @@ -961,12 +894,7 @@ ParseError("Invalid FETCH type"); return 0; } - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (!FillAndValidateVersionSpecificParameters(parameters, fetch.parameters, - MoqtMessageType::kFetch)) { + if (!FillAndValidateMessageParameters(reader, fetch.parameters)) { return 0; } visitor_.OnFetchMessage(fetch); @@ -975,17 +903,11 @@ size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) { MoqtFetchOk fetch_ok; - uint8_t group_order, end_of_track; - KeyValuePairList parameters; + uint8_t end_of_track; if (!reader.ReadVarInt62(&fetch_ok.request_id) || - !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&end_of_track) || + !reader.ReadUInt8(&end_of_track) || !reader.ReadVarInt62(&fetch_ok.end_location.group) || - !reader.ReadVarInt62(&fetch_ok.end_location.object) || - !ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (group_order != 0x01 && group_order != 0x02) { - ParseError("Invalid group order value in FETCH_OK"); + !reader.ReadVarInt62(&fetch_ok.end_location.object)) { return 0; } if (end_of_track > 0x01) { @@ -997,10 +919,15 @@ } else { --fetch_ok.end_location.object; } - fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); fetch_ok.end_of_track = end_of_track == 1; - if (!FillAndValidateVersionSpecificParameters(parameters, fetch_ok.parameters, - MoqtMessageType::kFetchOk)) { + if (!FillAndValidateMessageParameters(reader, fetch_ok.parameters)) { + return 0; + } + if (!ParseKeyValuePairListWithNoPrefix(reader, fetch_ok.extensions)) { + return 0; + } + if (!fetch_ok.extensions.Validate()) { + ParseError("Invalid FETCH_OK track extensions"); return 0; } visitor_.OnFetchOkMessage(fetch_ok); @@ -1027,45 +954,20 @@ size_t MoqtControlParser::ProcessPublish(quic::QuicDataReader& reader) { MoqtPublish publish; - uint8_t group_order, content_exists; QUICHE_DCHECK(reader.PreviouslyReadPayload().empty()); if (!reader.ReadVarInt62(&publish.request_id) || !ReadFullTrackName(reader, publish.full_track_name) || - !reader.ReadVarInt62(&publish.track_alias) || - !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&content_exists)) { + !reader.ReadVarInt62(&publish.track_alias)) { return 0; } - publish.group_order = static_cast<MoqtDeliveryOrder>(group_order); - if (group_order != 0x01 && group_order != 0x02) { - ParseError("Invalid group order value in PUBLISH"); + if (!FillAndValidateMessageParameters(reader, publish.parameters)) { return 0; } - if (content_exists > 1) { - ParseError("PUBLISH ContentExists has invalid value"); + if (!ParseKeyValuePairListWithNoPrefix(reader, publish.extensions)) { return 0; } - if (content_exists == 1) { - uint64_t group, object; - if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) { - return 0; - } - publish.largest_location = Location(group, object); - } - uint8_t forward; - if (!reader.ReadUInt8(&forward)) { - return 0; - } - if (forward > 0x01) { - ParseError("Invalid forward value in PUBLISH"); - return 0; - } - publish.forward = forward == 1; - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (!FillAndValidateVersionSpecificParameters(parameters, publish.parameters, - MoqtMessageType::kPublish)) { + if (!publish.extensions.Validate()) { + ParseError("Invalid PUBLISH track extensions"); return 0; } visitor_.OnPublishMessage(publish); @@ -1074,58 +976,10 @@ size_t MoqtControlParser::ProcessPublishOk(quic::QuicDataReader& reader) { MoqtPublishOk publish_ok; - uint8_t forward, group_order; - uint64_t filter_type; - KeyValuePairList parameters; - if (!reader.ReadVarInt62(&publish_ok.request_id) || - !reader.ReadUInt8(&forward) || - !reader.ReadUInt8(&publish_ok.subscriber_priority) || - !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter_type)) { + if (!reader.ReadVarInt62(&publish_ok.request_id)) { return 0; } - if (forward > 0x01) { - ParseError("Invalid forward value in PUBLISH_OK"); - return 0; - } - publish_ok.forward = forward == 1; - if (group_order != 0x01 && group_order != 0x02) { - ParseError("Invalid group order value in PUBLISH_OK"); - return 0; - } - publish_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); - publish_ok.filter_type = static_cast<MoqtFilterType>(filter_type); - uint64_t group, object, end_group; - switch (publish_ok.filter_type) { - case MoqtFilterType::kNextGroupStart: - case MoqtFilterType::kLargestObject: - break; - case MoqtFilterType::kAbsoluteStart: - case MoqtFilterType::kAbsoluteRange: - if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) { - return 0; - } - publish_ok.start = Location(group, object); - if (publish_ok.filter_type == MoqtFilterType::kAbsoluteStart) { - break; - } - if (!reader.ReadVarInt62(&end_group)) { - return 0; - } - publish_ok.end_group = end_group; - if (*publish_ok.end_group < publish_ok.start->group) { - ParseError("End group is less than start group"); - return 0; - } - break; - default: - ParseError("Invalid filter type"); - return 0; - } - if (!ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (!FillAndValidateVersionSpecificParameters( - parameters, publish_ok.parameters, MoqtMessageType::kPublishOk)) { + if (!FillAndValidateMessageParameters(reader, publish_ok.parameters)) { return 0; } visitor_.OnPublishOkMessage(publish_ok); @@ -1246,26 +1100,6 @@ return true; } -bool MoqtControlParser::FillAndValidateVersionSpecificParameters( - const KeyValuePairList& in, VersionSpecificParameters& out, - MoqtMessageType message_type) { - MoqtError error = out.FromKeyValuePairList(in); - if (error != MoqtError::kNoError) { - absl::string_view error_message = - (error == MoqtError::kProtocolViolation) - ? "Duplicate Version Specific Parameter" - : "Version Specific Parameter parsing error"; - ParseError(error, error_message); - return false; - } - if (!VersionSpecificParametersAllowedByMessage(out, message_type)) { - ParseError(MoqtError::kProtocolViolation, - "Version Specific Parameter not allowed for this message type"); - return false; - } - return true; -} - void MoqtDataParser::ParseError(absl::string_view reason) { if (parsing_error_) { return; // Don't send multiple parse errors.
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 9ba1162..79c10a1 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -175,9 +175,6 @@ // caller has no need to do so.) bool FillAndValidateMessageParameters(quic::QuicDataReader& reader, MessageParameters& out); - bool FillAndValidateVersionSpecificParameters(const KeyValuePairList& in, - VersionSpecificParameters& out, - MoqtMessageType message_type); MoqtControlParserVisitor& visitor_; quiche::ReadStream& stream_;
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 7c34e0c..bf0aa9d 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -311,9 +311,10 @@ return; } MoqtMessageType type = std::get<MoqtMessageType>(message_type_); - if (type == MoqtMessageType::kSubscribeOk) { - // SUBSCRIBE_OK has extensions, which use the length field to determine - // the size. It is therefore not processed correctly. + if (type == MoqtMessageType::kSubscribeOk || + type == MoqtMessageType::kFetchOk || type == MoqtMessageType::kPublish) { + // These message types have extensions, which use the length field to + // determine the size. It is therefore not processed correctly. return; } std::unique_ptr<TestMessageBase> message = MakeMessage(); @@ -1348,18 +1349,6 @@ "End object comes before start object in FETCH"); } -TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - FetchMessage fetch; - fetch.SetGroupOrder(3); - stream.Receive(fetch.PacketSample(), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "Invalid group order value in FETCH message"); -} - TEST_F(MoqtMessageSpecificTest, PaddingStream) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtDataParser parser(&stream, &visitor_); @@ -1446,166 +1435,6 @@ message.EqualFieldValues(*visitor_.last_message_)); } -TEST_F(MoqtMessageSpecificTest, PublishGroupOrder0) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish[27] = { - 0x1d, 0x00, 0x18, - 0x01, // request_id = 1 - 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x03, 0x62, 0x61, 0x72, // track_name = "bar" - 0x04, // track_alias = 4 - 0x00, // group_order - 0x01, 0x0a, 0x01, // content exists, largest_location = 10, 1 - 0x01, // forward = true - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" - }; - stream.Receive(absl::string_view(publish, sizeof(publish)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in PUBLISH"); -} - -TEST_F(MoqtMessageSpecificTest, PublishContentExists2) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish[27] = { - 0x1d, 0x00, 0x18, - 0x01, // request_id = 1 - 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x03, 0x62, 0x61, 0x72, // track_name = "bar" - 0x04, // track_alias = 4 - 0x01, // group_order - 0x02, 0x0a, 0x01, // content exists, largest_location = 10, 1 - 0x01, // forward = true - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" - }; - stream.Receive(absl::string_view(publish, sizeof(publish)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "PUBLISH ContentExists has invalid value"); -} - -TEST_F(MoqtMessageSpecificTest, PublishForward2) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish[27] = { - 0x1d, 0x00, 0x18, - 0x01, // request_id = 1 - 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x03, 0x62, 0x61, 0x72, // track_name = "bar" - 0x04, // track_alias = 4 - 0x01, // group_order - 0x01, 0x0a, 0x01, // content exists, largest_location = 10, 1 - 0x02, // forward = true - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" - }; - stream.Receive(absl::string_view(publish, sizeof(publish)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in PUBLISH"); -} - -TEST_F(MoqtMessageSpecificTest, PublishOkForward2) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish_ok[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x02, // forward - 0x02, // subscriber_priority = 2 - 0x01, // group_order = kAscending - 0x04, // filter_type = kAbsoluteRange - 0x05, 0x04, // start = 5, 4 - 0x06, // end_group = 6 - 0x01, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - }; - stream.Receive(absl::string_view(publish_ok, sizeof(publish_ok)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in PUBLISH_OK"); -} - -TEST_F(MoqtMessageSpecificTest, PublishOkGroupOrder0) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish_ok[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x01, // forward - 0x02, // subscriber_priority = 2 - 0x00, // group_order - 0x04, // filter_type = kAbsoluteRange - 0x05, 0x04, // start = 5, 4 - 0x06, // end_group = 6 - 0x01, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - }; - stream.Receive(absl::string_view(publish_ok, sizeof(publish_ok)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in PUBLISH_OK"); -} - -TEST_F(MoqtMessageSpecificTest, PublishOkFilter5) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish_ok[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x01, // forward - 0x02, // subscriber_priority = 2 - 0x01, // group_order - 0x05, // filter_type - 0x05, 0x04, // start = 5, 4 - 0x06, // end_group = 6 - 0x01, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - }; - stream.Receive(absl::string_view(publish_ok, sizeof(publish_ok)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "Invalid filter type"); -} - -TEST_F(MoqtMessageSpecificTest, PublishOkEndBeforeStart) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish_ok[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x01, // forward - 0x02, // subscriber_priority = 2 - 0x01, // group_order - 0x04, // filter_type - 0x05, 0x04, // start = 5, 4 - 0x04, // end_group = 4 - 0x01, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - }; - stream.Receive(absl::string_view(publish_ok, sizeof(publish_ok)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group"); -} - -TEST_F(MoqtMessageSpecificTest, PublishOkHasMaxCacheDuration) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char publish_ok[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x01, // forward - 0x02, // subscriber_priority = 2 - 0x01, // group_order - 0x04, // filter_type - 0x05, 0x04, // start = 5, 4 - 0x06, // end_group = 6 - 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 - }; - stream.Receive(absl::string_view(publish_ok, sizeof(publish_ok)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); -} - TEST_F(MoqtMessageSpecificTest, InvalidSubscribeNamespaceOption) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_);
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index fe65ad1..66f3d7a 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -106,11 +106,11 @@ // Performs a fetch for the specified range of objects. virtual std::unique_ptr<MoqtFetchTask> StandaloneFetch( - Location start, Location end, std::optional<MoqtDeliveryOrder> order) = 0; + Location start, Location end, MoqtDeliveryOrder order) = 0; virtual std::unique_ptr<MoqtFetchTask> RelativeFetch( - uint64_t group_diff, std::optional<MoqtDeliveryOrder> order) = 0; + uint64_t group_diff, MoqtDeliveryOrder order) = 0; virtual std::unique_ptr<MoqtFetchTask> AbsoluteFetch( - uint64_t group, std::optional<MoqtDeliveryOrder> order) = 0; + uint64_t group, MoqtDeliveryOrder order) = 0; }; // MoqtPublisher is an interface to a publisher that allows it to publish
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index c6158fd..57738c0 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -92,20 +92,19 @@ std::optional<Location> largest_location() const override; const TrackExtensions& extensions() const override { return extensions_; } std::optional<quic::QuicTimeDelta> expiration() const override; - std::unique_ptr<MoqtFetchTask> StandaloneFetch( - Location /*start*/, Location /*end*/, - std::optional<MoqtDeliveryOrder> /*order*/) override { + std::unique_ptr<MoqtFetchTask> StandaloneFetch(Location /*start*/, + Location /*end*/, + MoqtDeliveryOrder) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); } - std::unique_ptr<MoqtFetchTask> RelativeFetch( - uint64_t /*group_diff*/, - std::optional<MoqtDeliveryOrder> /*order*/) override { + std::unique_ptr<MoqtFetchTask> RelativeFetch(uint64_t /*group_diff*/, + MoqtDeliveryOrder) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); } - std::unique_ptr<MoqtFetchTask> AbsoluteFetch( - uint64_t /*group*/, std::optional<MoqtDeliveryOrder> /*order*/) override { + std::unique_ptr<MoqtFetchTask> AbsoluteFetch(uint64_t /*group*/, + MoqtDeliveryOrder) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); }
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index e430899..441b52e 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -548,9 +548,7 @@ bool MoqtSession::Fetch(const FullTrackName& name, FetchResponseCallback callback, Location start, uint64_t end_group, std::optional<uint64_t> end_object, - MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) { + MessageParameters parameters) { QUICHE_DCHECK(name.IsValid()); if (next_request_id_ >= peer_max_request_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send FETCH with ID " @@ -570,8 +568,6 @@ message.fetch = StandaloneFetch(name, start, end_location); message.request_id = next_request_id_; next_request_id_ += 2; - message.subscriber_priority = priority; - message.group_order = delivery_order; message.parameters = parameters; SendControlMessage(framer_.SerializeFetch(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for " << name; @@ -584,7 +580,7 @@ bool MoqtSession::RelativeJoiningFetch(const FullTrackName& name, SubscribeVisitor* visitor, uint64_t num_previous_groups, - VersionSpecificParameters parameters) { + MessageParameters parameters) { QUICHE_DCHECK(name.IsValid()); return RelativeJoiningFetch( name, visitor, @@ -599,15 +595,14 @@ RemoteTrackByName(track->full_track_name()); subscribe->OnJoiningFetchReady(std::move(fetch_task)); }, - num_previous_groups, kDefaultSubscriberPriority, std::nullopt, - parameters); + num_previous_groups, parameters); } -bool MoqtSession::RelativeJoiningFetch( - const FullTrackName& name, SubscribeVisitor* visitor, - FetchResponseCallback callback, uint64_t num_previous_groups, - MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) { +bool MoqtSession::RelativeJoiningFetch(const FullTrackName& name, + SubscribeVisitor* visitor, + FetchResponseCallback callback, + uint64_t num_previous_groups, + MessageParameters parameters) { QUICHE_DCHECK(name.IsValid()); if ((next_request_id_ + 2) >= peer_max_request_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID " @@ -616,16 +611,9 @@ << peer_max_request_id_; return false; } - MessageParameters subscribe_parameters(MoqtFilterType::kLargestObject); - if (parameters.delivery_timeout != kDefaultDeliveryTimeout) { - subscribe_parameters.delivery_timeout = parameters.delivery_timeout; - } - if (priority != kDefaultSubscriberPriority) { - subscribe_parameters.subscriber_priority = priority; - } - subscribe_parameters.group_order = delivery_order; - subscribe_parameters.authorization_tokens = parameters.authorization_tokens; - subscribe_parameters.oack_window_size = parameters.oack_window_size; + MessageParameters subscribe_parameters = parameters; + subscribe_parameters.subscription_filter.emplace( + MoqtFilterType::kLargestObject); if (!Subscribe(name, visitor, subscribe_parameters)) { return false; } @@ -633,11 +621,8 @@ MoqtFetch fetch; fetch.request_id = next_request_id_; next_request_id_ += 2; - fetch.subscriber_priority = priority; - fetch.group_order = delivery_order; fetch.fetch = JoiningFetchRelative{fetch.request_id - 2, num_previous_groups}; fetch.parameters = parameters; - fetch.parameters.delivery_timeout = quic::QuicTimeDelta::Infinite(); SendControlMessage(framer_.SerializeFetch(fetch)); QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name; auto upstream_fetch = @@ -1204,8 +1189,7 @@ UpstreamFetch* fetch = absl::down_cast<UpstreamFetch*>(track); absl::Status status = RequestErrorCodeToStatus(message.error_code, message.reason_phrase); - fetch->OnFetchResult(Location(0, 0), MoqtDeliveryOrder::kAscending, - status, nullptr); + fetch->OnFetchResult(Location(0, 0), status, nullptr); } else { SubscribeRemoteTrack* subscribe = absl::down_cast<SubscribeRemoteTrack*>(track); @@ -1472,14 +1456,15 @@ << " rejected by the application: not found"; SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist, std::nullopt, "not found"); + return; } QUIC_DLOG(INFO) << ENDPOINT << "Received a StandaloneFETCH for " << track_name; // The check for end_object < start_object is done in // MoqtTrackPublisher::Fetch(). - fetch = track_publisher->StandaloneFetch(standalone_fetch.start_location, - standalone_fetch.end_location, - message.group_order); + fetch = track_publisher->StandaloneFetch( + standalone_fetch.start_location, standalone_fetch.end_location, + message.parameters.group_order.value_or(MoqtDeliveryOrder::kAscending)); } else { // Joining Fetch processing. uint64_t joining_request_id = @@ -1512,14 +1497,16 @@ QUIC_DLOG(INFO) << ENDPOINT << "Received a Relative Joining FETCH for " << track_name; fetch = it->second->publisher().RelativeFetch( - relative_fetch.joining_start, message.group_order); + relative_fetch.joining_start, message.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)); } else { const JoiningFetchAbsolute& absolute_fetch = std::get<JoiningFetchAbsolute>(message.fetch); QUIC_DLOG(INFO) << ENDPOINT << "Received a Absolute Joining FETCH for " << track_name; fetch = it->second->publisher().AbsoluteFetch( - absolute_fetch.joining_start, message.group_order); + absolute_fetch.joining_start, message.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)); } } if (!fetch->GetStatus().ok()) { @@ -1560,7 +1547,10 @@ // Set a temporary new-object callback that creates a data stream. When // created, the stream visitor will replace this callback. fetch_task->SetObjectAvailableCallback( - [this, send_order = SendOrderForFetch(message.subscriber_priority), + [this, + send_order = + SendOrderForFetch(message.parameters.subscriber_priority.value_or( + kDefaultSubscriberPriority)), request_id = message.request_id]() { auto it = session_->incoming_fetches_.find(request_id); if (it == session_->incoming_fetches_.end()) { @@ -1597,7 +1587,7 @@ << message.request_id << " " << track->full_track_name(); UpstreamFetch* fetch = absl::down_cast<UpstreamFetch*>(track); fetch->OnFetchResult( - message.end_location, message.group_order, absl::OkStatus(), + message.end_location, absl::OkStatus(), [=, session = session_]() { session->CancelFetch(message.request_id); }); }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 1abe949..b134910 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -112,19 +112,17 @@ void Unsubscribe(const FullTrackName& name) override; bool Fetch(const FullTrackName& name, FetchResponseCallback callback, Location start, uint64_t end_group, - std::optional<uint64_t> end_object, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) override; + std::optional<uint64_t> end_object, + MessageParameters parameters) override; bool RelativeJoiningFetch(const FullTrackName& name, SubscribeVisitor* visitor, uint64_t num_previous_groups, - VersionSpecificParameters parameters) override; + MessageParameters parameters) override; bool RelativeJoiningFetch(const FullTrackName& name, SubscribeVisitor* visitor, FetchResponseCallback callback, - uint64_t num_previous_groups, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) override; + uint64_t num_previous_groups, + MessageParameters parameters) override; bool PublishNamespace(const TrackNamespace& track_namespace, const MessageParameters& parameters, MoqtResponseCallback response_callback,
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index cb67da6..91a68c5 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -102,9 +102,8 @@ // the MoqtFetchTask. virtual bool Fetch(const FullTrackName& name, FetchResponseCallback callback, Location start, uint64_t end_group, - std::optional<uint64_t> end_object, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) = 0; + std::optional<uint64_t> end_object, + MessageParameters parameters) = 0; // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups` // groups before the current group. The Fetch will not be flow controlled, @@ -115,16 +114,16 @@ virtual bool RelativeJoiningFetch(const FullTrackName& name, SubscribeVisitor* visitor, uint64_t num_previous_groups, - VersionSpecificParameters parameters) = 0; + MessageParameters parameters) = 0; // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups` // groups before the current group. `callback` acts the same way as the // callback for the regular Fetch() call. - virtual bool RelativeJoiningFetch( - const FullTrackName& name, SubscribeVisitor* visitor, - FetchResponseCallback callback, uint64_t num_previous_groups, - MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters) = 0; + virtual bool RelativeJoiningFetch(const FullTrackName& name, + SubscribeVisitor* visitor, + FetchResponseCallback callback, + uint64_t num_previous_groups, + MessageParameters parameters) = 0; // Send a PUBLISH_NAMESPACE message for |track_namespace|, and call // |response_callback| when the response arrives. Will fail // immediately if there is already an unresolved PUBLISH_NAMESPACE for that
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 18150d6..2819ab5 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -107,12 +107,9 @@ MoqtFetch DefaultFetch() { MoqtFetch fetch = { kDefaultPeerRequestId, - /*subscriber_priority=*/0x80, - /*group_order=*/std::nullopt, - /*fetch=*/ StandaloneFetch(kDefaultTrackName(), Location(0, 0), Location(1, kMaxObjectId)), - /*parameters=*/VersionSpecificParameters(), + MessageParameters(), }; return fetch; } @@ -419,11 +416,9 @@ .request_id = 1, .full_track_name = FullTrackName("foo", "bar"), .track_alias = 2, - .group_order = MoqtDeliveryOrder::kAscending, - .largest_location = Location(4, 5), - .forward = true, - .parameters = VersionSpecificParameters(), + .parameters = MessageParameters(), }; + publish.parameters.largest_object = Location(4, 5); std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); // Request for track returns REQUEST_ERROR. @@ -2509,7 +2504,6 @@ // Compose and send the FETCH_OK. MoqtFetchOk expected_ok; expected_ok.request_id = fetch.request_id; - expected_ok.group_order = MoqtDeliveryOrder::kAscending; expected_ok.end_of_track = false; expected_ok.end_location = Location(1, 4); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); @@ -2534,7 +2528,6 @@ MoqtFetchOk expected_ok; expected_ok.request_id = fetch.request_id; - expected_ok.group_order = MoqtDeliveryOrder::kAscending; expected_ok.end_of_track = false; expected_ok.end_location = Location(1, 4); auto fetch_task_ptr = @@ -2578,7 +2571,6 @@ MoqtFetchOk expected_ok; expected_ok.request_id = fetch.request_id; - expected_ok.group_order = MoqtDeliveryOrder::kAscending; expected_ok.end_of_track = false; expected_ok.end_location = Location(1, 4); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); @@ -2770,20 +2762,18 @@ MoqtSubscribe expected_subscribe( 0, FullTrackName("foo", "bar"), MessageParameters(MoqtFilterType::kLargestObject)); - expected_subscribe.parameters.group_order = MoqtDeliveryOrder::kAscending; MoqtFetch expected_fetch = { /*request_id=*/2, - /*subscriber_priority=*/0x80, - /*group_order=*/MoqtDeliveryOrder::kAscending, /*fetch=*/JoiningFetchRelative(0, 1), + MessageParameters(), }; EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_subscribe), _)); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_fetch), _)); - EXPECT_TRUE(session_.RelativeJoiningFetch( - expected_subscribe.full_track_name, &remote_track_visitor_, nullptr, 1, - 0x80, MoqtDeliveryOrder::kAscending, VersionSpecificParameters())); + EXPECT_TRUE(session_.RelativeJoiningFetch(expected_subscribe.full_track_name, + &remote_track_visitor_, nullptr, 1, + MessageParameters())); } TEST_F(MoqtSessionTest, SendJoiningFetchNoFlowControl) { @@ -2797,16 +2787,15 @@ Writev(ControlMessageOfType(MoqtMessageType::kFetch), _)); EXPECT_TRUE(session_.RelativeJoiningFetch(FullTrackName("foo", "bar"), &remote_track_visitor_, 0, - VersionSpecificParameters())); + MessageParameters())); EXPECT_CALL(remote_track_visitor_, OnReply).Times(1); MessageParameters parameters; parameters.largest_object = Location(2, 0); stream_input->OnSubscribeOkMessage( MoqtSubscribeOk(0, 2, parameters, TrackExtensions())); - stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending, - false, Location(2, 0), - VersionSpecificParameters())); + stream_input->OnFetchOkMessage(MoqtFetchOk( + 2, false, Location(2, 0), MessageParameters(), TrackExtensions())); // Packet arrives on FETCH stream. MoqtObject object = { /*request_id=*/2, @@ -2962,14 +2951,11 @@ [&](std::unique_ptr<MoqtFetchTask> task) { fetch_task = std::move(task); }, - Location(0, 0), 4, std::nullopt, kDefaultPublisherPriority, std::nullopt, - VersionSpecificParameters()); + Location(0, 0), 4, std::nullopt, MessageParameters()); MoqtFetchOk ok = { /*request_id=*/0, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*end_of_track=*/false, - /*end_location=*/Location(3, 25), - VersionSpecificParameters(), + /*end_of_track=*/false, Location(3, 25), + MessageParameters(), TrackExtensions(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -2992,8 +2978,7 @@ [&](std::unique_ptr<MoqtFetchTask> task) { fetch_task = std::move(task); }, - Location(0, 0), 4, std::nullopt, kDefaultPublisherPriority, std::nullopt, - VersionSpecificParameters()); + Location(0, 0), 4, std::nullopt, MessageParameters()); MoqtRequestError error = { /*request_id=*/0, RequestErrorCode::kUnauthorized, @@ -3031,8 +3016,7 @@ } while (result != MoqtFetchTask::GetNextObjectResult::kPending); }); }, - Location(0, 0), 4, std::nullopt, kDefaultPublisherPriority, std::nullopt, - VersionSpecificParameters()); + Location(0, 0), 4, std::nullopt, MessageParameters()); // Build queue of packets to arrive. std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; @@ -3071,10 +3055,10 @@ // FETCH_OK arrives, objects are delivered. MoqtFetchOk ok = { /*request_id=*/0, - /*group_order=*/MoqtDeliveryOrder::kAscending, /*end_of_track=*/false, /*end_location=*/Location(3, 25), - VersionSpecificParameters(), + MessageParameters(), + TrackExtensions(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -3103,8 +3087,7 @@ fetch_task->SetObjectAvailableCallback( [&]() { objects_available = true; }); }, - Location(0, 0), 4, std::nullopt, kDefaultPublisherPriority, std::nullopt, - VersionSpecificParameters()); + Location(0, 0), 4, std::nullopt, MessageParameters()); // Build queue of packets to arrive. std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; @@ -3143,10 +3126,8 @@ // FETCH_OK arrives, objects are available. MoqtFetchOk ok = { /*request_id=*/0, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*end_of_track=*/false, - /*end_location=*/Location(3, 25), - VersionSpecificParameters(), + /*end_of_track=*/false, Location(3, 25), + MessageParameters(), TrackExtensions(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -3520,9 +3501,9 @@ +[](std::optional<MoqtRequestErrorInfo>) {}, +[](MoqtRequestErrorInfo) {}); EXPECT_FALSE(session_.Fetch( - FullTrackName{{"foo"}, "bar"}, +[](std::unique_ptr<MoqtFetchTask>) {}, - Location(0, 0), 5, std::nullopt, 127, std::nullopt, - VersionSpecificParameters())); + FullTrackName{TrackNamespace({"foo"}), "bar"}, + +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, + MessageParameters())); // Error on additional GOAWAY. EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -3588,9 +3569,9 @@ +[](std::optional<MoqtRequestErrorInfo>) {}, +[](MoqtRequestErrorInfo) {}); EXPECT_FALSE(session_.Fetch( - FullTrackName({"foo"}, "bar"), +[](std::unique_ptr<MoqtFetchTask>) {}, - Location(0, 0), 5, std::nullopt, 127, std::nullopt, - VersionSpecificParameters())); + FullTrackName(TrackNamespace({"foo"}), "bar"), + +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, + MessageParameters())); session_.GoAway(""); // GoAway timer fires. auto* goaway_alarm =
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index e627528..25bf9b4 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -136,23 +136,8 @@ } void UpstreamFetch::OnFetchResult(Location largest_location, - MoqtDeliveryOrder group_order, absl::Status status, TaskDestroyedCallback callback) { - if (group_order_.has_value()) { - // Data stream already implied a group order. - if (*group_order_ != group_order) { - // The track is malformed. Tell the application it failed. - std::move(ok_callback_)( - std::make_unique<MoqtFailedFetch>(MoqtStreamErrorToStatus( - kResetCodeMalformedTrack, "Group order violation"))); - // Tell the session this failed, so it can cancel the FETCH. - std::move(callback)(); - return; - } - } else { - group_order_ = group_order; - } if (!status.ok()) { std::move(ok_callback_)(std::make_unique<MoqtFailedFetch>(status)); // This is called from OnRequestError, which will delete UpstreamFetch. So @@ -215,14 +200,8 @@ return (!last_group_is_finished && location.object > last_location->object); } // Group ID has changed. - if (!group_order_.has_value()) { - group_order_ = location.group > last_location->group - ? MoqtDeliveryOrder::kAscending - : MoqtDeliveryOrder::kDescending; - return true; - } return ((location.group > last_location->group) == - (*group_order_ == MoqtDeliveryOrder::kAscending)); + (group_order_ == MoqtDeliveryOrder::kAscending)); } UpstreamFetch::UpstreamFetchTask::~UpstreamFetchTask() {
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 17904c6..7bef001 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -195,24 +195,33 @@ UpstreamFetch(const MoqtFetch& fetch, const StandaloneFetch standalone, FetchResponseCallback callback) : RemoteTrack(standalone.full_track_name, fetch.request_id), + group_order_(fetch.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)), window_(SubscribeWindow(standalone.start_location, standalone.end_location)), - subscriber_priority_(fetch.subscriber_priority), + subscriber_priority_(fetch.parameters.subscriber_priority.value_or( + kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} // Relative Joining Fetch constructor UpstreamFetch(const MoqtFetch& fetch, FullTrackName full_track_name, FetchResponseCallback callback) : RemoteTrack(full_track_name, fetch.request_id), + group_order_(fetch.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)), window_(SubscribeWindow(Location(0, 0))), - subscriber_priority_(fetch.subscriber_priority), + subscriber_priority_(fetch.parameters.subscriber_priority.value_or( + kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} // Absolute Joining Fetch constructor UpstreamFetch(const MoqtFetch& fetch, FullTrackName full_track_name, JoiningFetchAbsolute absolute_joining, FetchResponseCallback callback) : RemoteTrack(full_track_name, fetch.request_id), + group_order_(fetch.parameters.group_order.value_or( + MoqtDeliveryOrder::kAscending)), window_(SubscribeWindow(Location(absolute_joining.joining_start, 0))), - subscriber_priority_(fetch.subscriber_priority), + subscriber_priority_(fetch.parameters.subscriber_priority.value_or( + kDefaultSubscriberPriority)), ok_callback_(std::move(callback)) {} UpstreamFetch(const UpstreamFetch&) = delete; ~UpstreamFetch(); @@ -312,8 +321,8 @@ }; // Arrival of FETCH_OK/REQUEST_ERROR. - void OnFetchResult(Location largest_location, MoqtDeliveryOrder group_order, - absl::Status status, TaskDestroyedCallback callback); + void OnFetchResult(Location largest_location, absl::Status status, + TaskDestroyedCallback callback); UpstreamFetchTask* task() { return task_.GetIfAvailable(); } @@ -328,7 +337,7 @@ bool end_of_message); private: - std::optional<MoqtDeliveryOrder> group_order_; // nullopt if not yet known. + MoqtDeliveryOrder group_order_; SubscribeWindow window_; MoqtPriority subscriber_priority_; // The last object received on the stream.
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index ad99a8a..618739c 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -89,12 +89,9 @@ MoqtFetch fetch_message_ = { /*request_id=*/1, - /*subscriber_priority=*/128, - /*group_order=*/std::nullopt, - /*fetch=*/ StandaloneFetch(FullTrackName("foo", "bar"), Location(1, 1), Location(3, 100)), - VersionSpecificParameters(), + MessageParameters(), }; // The pointer held by the application. UpstreamFetch fetch_; @@ -122,8 +119,7 @@ TEST_F(UpstreamFetchTest, FetchResponse) { EXPECT_EQ(fetch_task_, nullptr); - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_NE(fetch_task_, nullptr); EXPECT_NE(fetch_.task(), nullptr); EXPECT_TRUE(fetch_task_->GetStatus().ok()); @@ -131,8 +127,8 @@ TEST_F(UpstreamFetchTest, FetchClosedByMoqt) { bool terminated = false; - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), [&]() { terminated = true; }); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), + [&]() { terminated = true; }); bool got_eof = false; fetch_task_->SetObjectAvailableCallback([&]() { PublishedObject object; @@ -147,15 +143,14 @@ TEST_F(UpstreamFetchTest, FetchClosedByApplication) { bool terminated = false; - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::Status(), [&]() { terminated = true; }); + fetch_.OnFetchResult(Location(3, 50), absl::Status(), + [&]() { terminated = true; }); fetch_task_.reset(); EXPECT_TRUE(terminated); } TEST_F(UpstreamFetchTest, ObjectRetrieval) { - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); PublishedObject object; EXPECT_EQ(fetch_task_->GetNextObject(object), MoqtFetchTask::GetNextObjectResult::kPending); @@ -192,8 +187,7 @@ } TEST_F(UpstreamFetchTest, LocationIsValidOkFirstObjectIdDeclining) { - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_TRUE( fetch_.LocationIsValid(Location(1, 1), MoqtObjectStatus::kNormal, true)); EXPECT_TRUE( @@ -203,8 +197,7 @@ } TEST_F(UpstreamFetchTest, LocationIsValidPartialObject) { - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_TRUE( fetch_.LocationIsValid(Location(1, 1), MoqtObjectStatus::kNormal, true)); EXPECT_TRUE( @@ -214,8 +207,7 @@ } TEST_F(UpstreamFetchTest, LocationIsValidOkGroupDescendingIncorrectly) { - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_TRUE( fetch_.LocationIsValid(Location(2, 1), MoqtObjectStatus::kNormal, true)); EXPECT_TRUE( @@ -225,12 +217,17 @@ } TEST_F(UpstreamFetchTest, LocationIsValidOkGroupAscendingIncorrectly) { - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kDescending, - absl::OkStatus(), nullptr); + fetch_message_.parameters.group_order = MoqtDeliveryOrder::kDescending; + UpstreamFetch fetch(fetch_message_, + std::get<StandaloneFetch>(fetch_message_.fetch), + [&](std::unique_ptr<MoqtFetchTask> task) { + fetch_task_ = std::move(task); + }); + fetch.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); EXPECT_TRUE( - fetch_.LocationIsValid(Location(2, 1), MoqtObjectStatus::kNormal, true)); + fetch.LocationIsValid(Location(2, 1), MoqtObjectStatus::kNormal, true)); EXPECT_FALSE( - fetch_.LocationIsValid(Location(3, 1), MoqtObjectStatus::kNormal, true)); + fetch.LocationIsValid(Location(3, 1), MoqtObjectStatus::kNormal, true)); } TEST_F(UpstreamFetchTest, LocationIsValidLearnOrderThenOkSuccess) { @@ -238,25 +235,11 @@ fetch_.LocationIsValid(Location(1, 1), MoqtObjectStatus::kNormal, true)); EXPECT_TRUE( fetch_.LocationIsValid(Location(2, 1), MoqtObjectStatus::kNormal, true)); - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kAscending, - absl::OkStatus(), nullptr); + fetch_.OnFetchResult(Location(3, 50), absl::OkStatus(), nullptr); // Groups arrived in ascending order, but the FETCH_OK reported descending. EXPECT_TRUE(fetch_task_->GetStatus().ok()); } -TEST_F(UpstreamFetchTest, LocationIsValidLearnOrderThenOkFailure) { - EXPECT_TRUE( - fetch_.LocationIsValid(Location(1, 1), MoqtObjectStatus::kNormal, true)); - EXPECT_TRUE( - fetch_.LocationIsValid(Location(2, 1), MoqtObjectStatus::kNormal, true)); - bool termination_callback_called = false; - fetch_.OnFetchResult(Location(3, 50), MoqtDeliveryOrder::kDescending, - absl::OkStatus(), - [&]() { termination_callback_called = true; }); - // Groups arrived in ascending order, but the FETCH_OK reported descending. - EXPECT_TRUE(termination_callback_called); -} - TEST_F(UpstreamFetchTest, LocationIsValidObjectBeyondEndOfGroup) { EXPECT_TRUE(fetch_.LocationIsValid(Location(1, 1), MoqtObjectStatus::kEndOfGroup, true));
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h index 48c0553..e3e8e09 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -41,21 +41,17 @@ MOCK_METHOD(bool, Fetch, (const FullTrackName& name, FetchResponseCallback callback, Location start, uint64_t end_group, - std::optional<uint64_t> end_object, MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters), + std::optional<uint64_t> end_object, + MessageParameters parameters), (override)); MOCK_METHOD(bool, RelativeJoiningFetch, (const FullTrackName& name, SubscribeVisitor* visitor, - uint64_t num_previous_groups, - VersionSpecificParameters parameters), + uint64_t num_previous_groups, MessageParameters parameters), (override)); MOCK_METHOD(bool, RelativeJoiningFetch, (const FullTrackName& name, SubscribeVisitor* visitor, FetchResponseCallback callback, uint64_t num_previous_groups, - MoqtPriority priority, - std::optional<MoqtDeliveryOrder> delivery_order, - VersionSpecificParameters parameters), + MessageParameters parameters), (override)); MOCK_METHOD( bool, PublishNamespace,
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index b9dc508..810a107 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -86,12 +86,11 @@ MOCK_METHOD(std::optional<quic::QuicTimeDelta>, expiration, (), (const, override)); MOCK_METHOD(std::unique_ptr<MoqtFetchTask>, StandaloneFetch, - (Location, Location, std::optional<MoqtDeliveryOrder>), - (override)); + (Location, Location, MoqtDeliveryOrder), (override)); MOCK_METHOD(std::unique_ptr<MoqtFetchTask>, RelativeFetch, - (uint64_t, std::optional<MoqtDeliveryOrder>), (override)); + (uint64_t, MoqtDeliveryOrder), (override)); MOCK_METHOD(std::unique_ptr<MoqtFetchTask>, AbsoluteFetch, - (uint64_t, std::optional<MoqtDeliveryOrder>), (override)); + (uint64_t, MoqtDeliveryOrder), (override)); private: FullTrackName track_name_; @@ -129,20 +128,17 @@ } // TODO(martinduke): Support Fetch std::unique_ptr<MoqtFetchTask> StandaloneFetch( - Location start, Location end, - std::optional<MoqtDeliveryOrder> delivery_order) override { + Location start, Location end, MoqtDeliveryOrder delivery_order) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); } std::unique_ptr<MoqtFetchTask> RelativeFetch( - uint64_t offset, - std::optional<MoqtDeliveryOrder> delivery_order) override { + uint64_t offset, MoqtDeliveryOrder delivery_order) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); } std::unique_ptr<MoqtFetchTask> AbsoluteFetch( - uint64_t offset, - std::optional<MoqtDeliveryOrder> delivery_order) override { + uint64_t offset, MoqtDeliveryOrder delivery_order) override { return std::make_unique<MoqtFailedFetch>( absl::UnimplementedError("Fetch not implemented")); }
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index 1fa97e5..403a346 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -181,15 +181,12 @@ // Adds an upstream fetch and a stream ready to receive data. static std::unique_ptr<MoqtFetchTask> CreateUpstreamFetch( - MoqtSession* session, webtransport::Stream* stream, - MoqtDeliveryOrder order = MoqtDeliveryOrder::kAscending) { + MoqtSession* session, webtransport::Stream* stream) { MoqtFetch fetch_message = { 0, - 128, - std::nullopt, StandaloneFetch(FullTrackName{"foo", "bar"}, Location{0, 0}, Location{4, kMaxObjectId}), - VersionSpecificParameters(), + MessageParameters(), }; std::unique_ptr<MoqtFetchTask> task; auto [it, success] = session->upstream_by_id_.try_emplace( @@ -202,7 +199,7 @@ UpstreamFetch* fetch = absl::down_cast<UpstreamFetch*>(it->second.get()); // Initialize the fetch task fetch->OnFetchResult( - Location{4, 10}, order, absl::OkStatus(), + Location{4, 10}, absl::OkStatus(), [=, session_ptr = session, request_id = fetch_message.request_id]() { session_ptr->CancelFetch(request_id); });
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc index 1b0d796..2ed2240 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -315,7 +315,7 @@ // server does not yet have an active subscription, so the client has // some catching up to do. generator_.Start(); - VersionSpecificParameters subscription_parameters; + MessageParameters subscription_parameters; if (parameters_.bitrate_adaptation) { subscription_parameters.oack_window_size = parameters_.deadline; }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 308dde5..ec708bf 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -1291,6 +1291,10 @@ public: FetchMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + fetch_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "baz")); + fetch_.parameters.group_order = MoqtDeliveryOrder::kAscending; + fetch_.parameters.subscriber_priority = 2; } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); @@ -1298,14 +1302,6 @@ QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } - if (cast.subscriber_priority != fetch_.subscriber_priority) { - QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch"; - return false; - } - if (cast.group_order != fetch_.group_order) { - QUIC_LOG(INFO) << "FETCH group_order mismatch"; - return false; - } if (cast.fetch != fetch_.fetch) { QUIC_LOG(INFO) << "FETCH mismatch"; return false; @@ -1318,7 +1314,7 @@ } void ExpandVarints() override { - ExpandVarintsImpl("v--vvv---v---vvvvvv-----"); + ExpandVarintsImpl("vvvv---v---vvvvvvv-----vvvv"); } MessageStructuredData structured_data() const override { @@ -1332,41 +1328,39 @@ std::get<StandaloneFetch>(fetch_.fetch).end_location.group = group; std::get<StandaloneFetch>(fetch_.fetch).end_location.object = object.has_value() ? *object : kMaxObjectId; - raw_packet_[18] = group; - raw_packet_[19] = object.has_value() ? (*object + 1) : 0; + raw_packet_[16] = group; + raw_packet_[17] = object.has_value() ? (*object + 1) : 0; SetWireImage(raw_packet_, sizeof(raw_packet_)); } void SetGroupOrder(uint8_t group_order) { - raw_packet_[5] = static_cast<uint8_t>(group_order); + raw_packet_[29] = static_cast<uint8_t>(group_order); SetWireImage(raw_packet_, sizeof(raw_packet_)); } private: - uint8_t raw_packet_[28] = { - 0x16, 0x00, 0x19, - 0x01, // request_id = 1 - 0x02, // priority = kHigh - 0x01, // group_order = kAscending - 0x01, // type = kStandalone - 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x03, 0x62, 0x61, 0x72, // track_name = "bar" - 0x01, 0x02, // start_location = 1, 2 - 0x05, 0x07, // end_location = 5, 6 - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" + uint8_t raw_packet_[30] = { + 0x16, 0x00, 0x1b, + 0x01, // request_id = 1 + 0x01, // type = kStandalone + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x03, 0x62, 0x61, 0x72, // track_name = "bar" + 0x01, 0x02, // start_location = 1, 2 + 0x05, 0x07, // end_location = 5, 6 + 0x03, // 3 parameters + 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // token = "baz" + 0x1d, 0x02, // priority = kHigh + 0x02, 0x01, // group_order = kAscending }; MoqtFetch fetch_ = { /*request_id=*/1, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*fetch =*/ StandaloneFetch{ FullTrackName("foo", "bar"), /*start_location=*/Location{1, 2}, /*end_location=*/Location{5, 6}, }, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "baz"), + MessageParameters(), // set in constructor. }; }; @@ -1376,6 +1370,10 @@ public: RelativeJoiningFetchMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + fetch_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "baz")); + fetch_.parameters.group_order = MoqtDeliveryOrder::kAscending; + fetch_.parameters.subscriber_priority = 2; } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); @@ -1383,14 +1381,6 @@ QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } - if (cast.subscriber_priority != fetch_.subscriber_priority) { - QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch"; - return false; - } - if (cast.group_order != fetch_.group_order) { - QUIC_LOG(INFO) << "FETCH group_order mismatch"; - return false; - } if (cast.fetch != fetch_.fetch) { QUIC_LOG(INFO) << "FETCH mismatch"; return false; @@ -1402,36 +1392,33 @@ return true; } - void ExpandVarints() override { - ExpandVarintsImpl("v--vvv---v---vvvvvv-----"); - } + void ExpandVarints() override { ExpandVarintsImpl("vvvvvvv-----vvvv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(fetch_); } void SetGroupOrder(uint8_t group_order) { - raw_packet_[5] = static_cast<uint8_t>(group_order); + raw_packet_[18] = static_cast<uint8_t>(group_order); SetWireImage(raw_packet_, sizeof(raw_packet_)); } private: - uint8_t raw_packet_[17] = { - 0x16, 0x00, 0x0e, + uint8_t raw_packet_[19] = { + 0x16, 0x00, 0x10, 0x01, // request_id = 1 - 0x02, // priority = kHigh - 0x01, // group_order = kAscending 0x02, // type = kRelativeJoining 0x02, 0x02, // joining_request_id = 2, 2 groups - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" + 0x03, // 3 parameters + 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // token = "baz" + 0x1d, 0x02, // priority = kHigh + 0x02, 0x01, // group_order = kAscending }; MoqtFetch fetch_ = { /*request_id =*/1, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*fetch=*/JoiningFetchRelative{2, 2}, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "baz"), + JoiningFetchRelative{2, 2}, + MessageParameters(), // set in constructor. }; }; @@ -1441,6 +1428,10 @@ public: AbsoluteJoiningFetchMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + fetch_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "baz")); + fetch_.parameters.group_order = MoqtDeliveryOrder::kAscending; + fetch_.parameters.subscriber_priority = 2; } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtFetch>(values); @@ -1448,14 +1439,6 @@ QUIC_LOG(INFO) << "FETCH request_id mismatch"; return false; } - if (cast.subscriber_priority != fetch_.subscriber_priority) { - QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch"; - return false; - } - if (cast.group_order != fetch_.group_order) { - QUIC_LOG(INFO) << "FETCH group_order mismatch"; - return false; - } if (cast.fetch != fetch_.fetch) { QUIC_LOG(INFO) << "FETCH mismatch"; return false; @@ -1467,9 +1450,7 @@ return true; } - void ExpandVarints() override { - ExpandVarintsImpl("v--vvv---v---vvvvvv-----"); - } + void ExpandVarints() override { ExpandVarintsImpl("vvvvvvv-----vvvv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(fetch_); @@ -1481,22 +1462,21 @@ } private: - uint8_t raw_packet_[17] = { - 0x16, 0x00, 0x0e, + uint8_t raw_packet_[19] = { + 0x16, 0x00, 0x10, 0x01, // request_id = 1 - 0x02, // priority = kHigh - 0x01, // group_order = kAscending 0x03, // type = kAbsoluteJoining 0x02, 0x02, // joining_request_id = 2, group_id = 2 - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" + 0x03, // 3 parameters + 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // token = "baz" + 0x1d, 0x02, // priority = kHigh + 0x02, 0x01, // group_order = kAscending }; MoqtFetch fetch_ = { /*request_id=*/1, - /*subscriber_priority=*/2, - /*group_order=*/MoqtDeliveryOrder::kAscending, - /*fetch=*/JoiningFetchAbsolute{2, 2}, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "baz"), + JoiningFetchAbsolute{2, 2}, + MessageParameters(), // set in constructor. }; }; @@ -1511,10 +1491,6 @@ QUIC_LOG(INFO) << "FETCH_OK request_id mismatch"; return false; } - if (cast.group_order != fetch_ok_.group_order) { - QUIC_LOG(INFO) << "FETCH_OK group_order mismatch"; - return false; - } if (cast.end_of_track != fetch_ok_.end_of_track) { QUIC_LOG(INFO) << "FETCH_OK end_of_track mismatch"; return false; @@ -1527,32 +1503,39 @@ QUIC_LOG(INFO) << "FETCH_OK parameters mismatch"; return false; } + if (cast.extensions != fetch_ok_.extensions) { + QUIC_LOG(INFO) << "FETCH_OK extensions mismatch"; + return false; + } return true; } - void ExpandVarints() override { ExpandVarintsImpl("v--vvvvv---"); } + void ExpandVarints() override { ExpandVarintsImpl("v-vvvv--vv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(fetch_ok_); } private: - uint8_t raw_packet_[12] = { - 0x18, 0x00, 0x09, - 0x01, // request_id = 1 - 0x01, // group_order = kAscending - 0x00, // end_of_track = false - 0x05, 0x04, // end_location = 5, 3 - 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 + uint8_t raw_packet_[13] = { + 0x18, 0x00, 0x0a, + 0x01, // request_id = 1 + 0x00, // end_of_track = false + 0x05, 0x04, // end_location = 5, 3 + 0x00, // no parameters + 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 + 0x1e, 0x02, // group_order = kDescending }; MoqtFetchOk fetch_ok_ = { /*request_id =*/1, - /*group_order=*/MoqtDeliveryOrder::kAscending, /*end_of_track=*/false, /*end_location=*/Location{5, 3}, - VersionSpecificParameters(quic::QuicTimeDelta::Infinite(), - quic::QuicTimeDelta::FromMilliseconds(10000)), + MessageParameters(), + TrackExtensions(std::nullopt, + quic::QuicTimeDelta::FromMilliseconds(10000), + std::nullopt, MoqtDeliveryOrder::kDescending, + std::nullopt, std::nullopt), }; }; @@ -1622,6 +1605,10 @@ public: PublishMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + publish_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "baz")); + publish_.parameters.largest_object = Location(10, 1); + publish_.parameters.set_forward(true); } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtPublish>(values); @@ -1637,27 +1624,19 @@ QUIC_LOG(INFO) << "PUBLISH track_alias mismatch"; return false; } - if (cast.group_order != publish_.group_order) { - QUIC_LOG(INFO) << "PUBLISH group_order mismatch"; - return false; - } - if (cast.largest_location != publish_.largest_location) { - QUIC_LOG(INFO) << "PUBLISH largest_location mismatch"; - return false; - } - if (cast.forward != publish_.forward) { - QUIC_LOG(INFO) << "PUBLISH forward mismatch"; - return false; - } if (cast.parameters != publish_.parameters) { QUIC_LOG(INFO) << "PUBLISH parameters mismatch"; return false; } + if (cast.extensions != publish_.extensions) { + QUIC_LOG(INFO) << "PUBLISH extensions mismatch"; + return false; + } return true; } void ExpandVarints() override { - ExpandVarintsImpl("vvv---v---v--vv-vvv-----"); + ExpandVarintsImpl("vvv---v---vvvv-----vv--vvvv"); } MessageStructuredData structured_data() const override { @@ -1665,26 +1644,27 @@ } private: - uint8_t raw_packet_[27] = { - 0x1d, 0x00, 0x18, - 0x01, // request_id = 1 - 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x03, 0x62, 0x61, 0x72, // track_name = "bar" - 0x04, // track_alias = 4 - 0x01, // group_order = kAscending - 0x01, 0x0a, 0x01, // content exists, largest_location = 10, 1 - 0x01, // forward = true - 0x01, 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // parameters = "baz" + uint8_t raw_packet_[30] = { + 0x1d, 0x00, 0x1b, + 0x01, // request_id = 1 + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x03, 0x62, 0x61, 0x72, // track_name = "bar" + 0x04, // track_alias = 4 + 0x03, // 3 parameters + 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x7a, // token = "baz" + 0x06, 0x02, 0x0a, 0x01, // largest_object = 10, 1 + 0x07, 0x01, // forward = 1 + 0x22, 0x02, // group_order = kAscending }; MoqtPublish publish_ = { /*request_id=*/1, FullTrackName("foo", "bar"), /*track_alias=*/4, - MoqtDeliveryOrder::kAscending, - /*largest_location=*/Location(10, 1), - /*forward=*/true, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "baz"), + MessageParameters(), + TrackExtensions(std::nullopt, std::nullopt, std::nullopt, + MoqtDeliveryOrder::kDescending, std::nullopt, + std::nullopt), }; }; @@ -1692,6 +1672,13 @@ public: PublishOkMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + publish_ok_.parameters.delivery_timeout = + quic::QuicTimeDelta::FromMilliseconds(10000); + publish_ok_.parameters.set_forward(true); + publish_ok_.parameters.subscriber_priority = 2; + publish_ok_.parameters.group_order = MoqtDeliveryOrder::kAscending; + publish_ok_.parameters.subscription_filter = + SubscriptionFilter(Location(5, 4), 6); } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtPublishOk>(values); @@ -1699,30 +1686,6 @@ QUIC_LOG(INFO) << "PUBLISH_OK request_id mismatch"; return false; } - if (cast.forward != publish_ok_.forward) { - QUIC_LOG(INFO) << "PUBLISH_OK forward mismatch"; - return false; - } - if (cast.subscriber_priority != publish_ok_.subscriber_priority) { - QUIC_LOG(INFO) << "PUBLISH_OK subscriber_priority mismatch"; - return false; - } - if (cast.group_order != publish_ok_.group_order) { - QUIC_LOG(INFO) << "PUBLISH_OK group_order mismatch"; - return false; - } - if (cast.filter_type != publish_ok_.filter_type) { - QUIC_LOG(INFO) << "PUBLISH_OK filter_type mismatch"; - return false; - } - if (cast.start != publish_ok_.start) { - QUIC_LOG(INFO) << "PUBLISH_OK start mismatch"; - return false; - } - if (cast.end_group != publish_ok_.end_group) { - QUIC_LOG(INFO) << "PUBLISH_OK end_group mismatch"; - return false; - } if (cast.parameters != publish_ok_.parameters) { QUIC_LOG(INFO) << "PUBLISH_OK parameters mismatch"; return false; @@ -1730,34 +1693,26 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("v---vvvvvv--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv--vvvvvv----vv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(publish_ok_); } private: - uint8_t raw_packet_[15] = { - 0x1e, 0x00, 0x0c, - 0x01, // request_id = 1 - 0x01, // forward = true - 0x02, // subscriber_priority = 2 - 0x01, // group_order = kAscending - 0x04, // filter_type = kAbsoluteRange - 0x05, 0x04, // start = 5, 4 - 0x06, // end_group = 6 - 0x01, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + uint8_t raw_packet_[20] = { + 0x1e, 0x00, 0x11, + 0x01, // request_id = 1 + 0x05, // 5 parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x0e, 0x01, // forward = true + 0x10, 0x02, // subscriber_priority = 2 + 0x01, 0x04, 0x04, 0x05, 0x04, 0x06, // subscription filter: (5, 4) to 6 + 0x01, 0x01, // group_order = kAscending }; MoqtPublishOk publish_ok_ = { /*request_id=*/1, - /*forward=*/true, - /*subscriber_priority=*/2, - MoqtDeliveryOrder::kAscending, - MoqtFilterType::kAbsoluteRange, - /*start=*/Location(5, 4), - /*end_group=*/6, - VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::Infinite()), + MessageParameters(), // set in constructor. }; };
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index 1ae59e1..b3fd295 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -176,7 +176,7 @@ return; } session_->RelativeJoiningFetch(*full_track_name, &it->second, 0, - VersionSpecificParameters()); + MessageParameters()); } std::move(callback)(std::nullopt); }