Implement Key-Value-Pair struct from the MoQT spec. Update all messages to match the spec as to what parameters can be included. Parse and frame of parameters is in key-value multimap that can be shared with extensions. Renumber SETUP type to 0x20/0x21. Roll the version to draft-11, although this CL will interop with neither -10 nor -11. PiperOrigin-RevId: 754055947
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index b59bc5f..430616e 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -9,20 +9,16 @@ #include <cstdlib> #include <optional> #include <string> -#include <type_traits> #include <utility> -#include <variant> #include <vector> -#include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" -#include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" -#include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" @@ -41,135 +37,78 @@ using ::quiche::WireUint8; using ::quiche::WireVarInt62; -// Encoding for string parameters as described in -// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#name-parameters -struct StringParameter { - template <typename Enum> - StringParameter(Enum type, absl::string_view data) - : type(static_cast<uint64_t>(type)), data(data) { - static_assert(std::is_enum_v<Enum>); - } - - uint64_t type; - absl::string_view data; -}; -class WireStringParameter { +class WireKeyVarIntPair { public: - using DataType = StringParameter; + explicit WireKeyVarIntPair(uint64_t key, uint64_t value) + : key_(key), value_(value) {} - explicit WireStringParameter(const StringParameter& parameter) - : parameter_(parameter) {} size_t GetLengthOnWire() { - return quiche::ComputeLengthOnWire( - WireVarInt62(parameter_.type), - WireStringWithVarInt62Length(parameter_.data)); + return quiche::ComputeLengthOnWire(WireVarInt62(key_), + WireVarInt62(value_)); } absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) { - return quiche::SerializeIntoWriter( - writer, WireVarInt62(parameter_.type), - WireStringWithVarInt62Length(parameter_.data)); + return quiche::SerializeIntoWriter(writer, WireVarInt62(key_), + WireVarInt62(value_)); } private: - const StringParameter& parameter_; + const uint64_t key_; + const uint64_t value_; }; -// Encoding for integer parameters as described in -// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#name-parameters -struct IntParameter { - template <typename Enum, typename Param> - IntParameter(Enum type, Param value) - : type(static_cast<uint64_t>(type)), value(static_cast<uint64_t>(value)) { - static_assert(std::is_enum_v<Enum>); - static_assert(std::is_enum_v<Param> || std::is_unsigned_v<Param>); - } - - uint64_t type; - uint64_t value; -}; -class WireIntParameter { +class WireKeyStringPair { public: - using DataType = IntParameter; - - explicit WireIntParameter(const IntParameter& parameter) - : parameter_(parameter) {} + explicit WireKeyStringPair(uint64_t key, absl::string_view value) + : key_(key), value_(value) {} size_t GetLengthOnWire() { - return quiche::ComputeLengthOnWire( - WireVarInt62(parameter_.type), - WireVarInt62(NeededVarIntLen(parameter_.value)), - WireVarInt62(parameter_.value)); + return quiche::ComputeLengthOnWire(WireVarInt62(key_), + WireStringWithVarInt62Length(value_)); } absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) { - return quiche::SerializeIntoWriter( - writer, WireVarInt62(parameter_.type), - WireVarInt62(NeededVarIntLen(parameter_.value)), - WireVarInt62(parameter_.value)); + return quiche::SerializeIntoWriter(writer, WireVarInt62(key_), + WireStringWithVarInt62Length(value_)); } private: - size_t NeededVarIntLen(const uint64_t value) { - return static_cast<size_t>(quic::QuicDataWriter::GetVarInt62Len(value)); - } - - const IntParameter& parameter_; + const uint64_t key_; + const absl::string_view value_; }; -class WireSubscribeParameterList { +class WireKeyValuePairList { public: - explicit WireSubscribeParameterList(const MoqtSubscribeParameters& list) - : list_(list) {} + explicit WireKeyValuePairList(const KeyValuePairList& list) : list_(list) {} size_t GetLengthOnWire() { - auto string_parameters = StringParameters(); - auto int_parameters = IntParameters(); - return quiche::ComputeLengthOnWire( - WireVarInt62(string_parameters.size() + int_parameters.size()), - WireSpan<WireStringParameter>(string_parameters), - WireSpan<WireIntParameter>(int_parameters)); + size_t total = WireVarInt62(list_.size()).GetLengthOnWire(); + list_.ForEach( + [&](uint64_t key, uint64_t value) { + total += WireKeyVarIntPair(key, value).GetLengthOnWire(); + return true; + }, + [&](uint64_t key, absl::string_view value) { + total += WireKeyStringPair(key, value).GetLengthOnWire(); + return true; + }); + return total; } absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) { - auto string_parameters = StringParameters(); - auto int_parameters = IntParameters(); - return quiche::SerializeIntoWriter( - writer, WireVarInt62(string_parameters.size() + int_parameters.size()), - WireSpan<WireStringParameter>(string_parameters), - WireSpan<WireIntParameter>(int_parameters)); + WireVarInt62(list_.size()).SerializeIntoWriter(writer); + list_.ForEach( + [&](uint64_t key, uint64_t value) { + absl::Status status = + WireKeyVarIntPair(key, value).SerializeIntoWriter(writer); + return quiche::IsWriterStatusOk(status); + }, + [&](uint64_t key, absl::string_view value) { + absl::Status status = + WireKeyStringPair(key, value).SerializeIntoWriter(writer); + return quiche::IsWriterStatusOk(status); + }); + return absl::OkStatus(); } private: - absl::InlinedVector<StringParameter, 1> StringParameters() const { - absl::InlinedVector<StringParameter, 1> result; - if (list_.authorization_info.has_value()) { - result.push_back( - StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, - *list_.authorization_info)); - } - return result; - } - absl::InlinedVector<IntParameter, 3> IntParameters() const { - absl::InlinedVector<IntParameter, 3> result; - if (list_.delivery_timeout.has_value()) { - QUICHE_DCHECK_GE(*list_.delivery_timeout, quic::QuicTimeDelta::Zero()); - result.push_back(IntParameter( - MoqtTrackRequestParameter::kDeliveryTimeout, - static_cast<uint64_t>(list_.delivery_timeout->ToMilliseconds()))); - } - if (list_.max_cache_duration.has_value()) { - QUICHE_DCHECK_GE(*list_.max_cache_duration, quic::QuicTimeDelta::Zero()); - result.push_back(IntParameter( - MoqtTrackRequestParameter::kMaxCacheDuration, - static_cast<uint64_t>(list_.max_cache_duration->ToMilliseconds()))); - } - if (list_.object_ack_window.has_value()) { - QUICHE_DCHECK_GE(*list_.object_ack_window, quic::QuicTimeDelta::Zero()); - result.push_back(IntParameter( - MoqtTrackRequestParameter::kOackWindowSize, - static_cast<uint64_t>(list_.object_ack_window->ToMicroseconds()))); - } - return result; - } - - const MoqtSubscribeParameters& list_; + const KeyValuePairList& list_; }; class WireFullTrackName { @@ -261,18 +200,63 @@ return value << 1; } +void SessionParametersToKeyValuePairList( + const MoqtSessionParameters& parameters, KeyValuePairList& out) { + if (!parameters.using_webtrans && + parameters.perspective == quic::Perspective::IS_CLIENT) { + out.insert(SetupParameter::kPath, parameters.path); + } + if (parameters.max_subscribe_id > 0) { + out.insert(SetupParameter::kMaxRequestId, parameters.max_subscribe_id); + } + if (parameters.max_auth_token_cache_size > 0) { + out.insert(SetupParameter::kMaxAuthTokenCacheSize, + parameters.max_auth_token_cache_size); + } + if (parameters.support_object_acks) { + out.insert(SetupParameter::kSupportObjectAcks, 1ULL); + } +} + +void VersionSpecificParametersToKeyValuePairList( + const VersionSpecificParameters& parameters, KeyValuePairList& out) { + out.clear(); + for (const auto& it : parameters.authorization_token) { + out.insert(VersionSpecificParameter::kAuthorizationToken, it); + } + if (!parameters.delivery_timeout.IsInfinite()) { + out.insert( + VersionSpecificParameter::kDeliveryTimeout, + static_cast<uint64_t>(parameters.delivery_timeout.ToMilliseconds())); + } + if (parameters.authorization_info.has_value()) { + out.insert(VersionSpecificParameter::kAuthorizationInfo, + *parameters.authorization_info); + } + if (!parameters.max_cache_duration.IsInfinite()) { + out.insert( + VersionSpecificParameter::kMaxCacheDuration, + static_cast<uint64_t>(parameters.max_cache_duration.ToMilliseconds())); + } + if (parameters.oack_window_size.has_value()) { + out.insert( + VersionSpecificParameter::kOackWindowSize, + static_cast<uint64_t>(parameters.oack_window_size->ToMicroseconds())); + } +} + } // namespace quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( const MoqtObject& message, MoqtDataStreamType message_type, bool is_first_in_stream) { if (!ValidateObjectMetadata(message, /*is_datagram=*/false)) { - QUIC_BUG(quic_bug_serialize_object_header_01) + QUICHE_BUG(QUICHE_BUG_serialize_object_header_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); } if (!message.subgroup_id.has_value()) { - QUIC_BUG(quic_bug_serialize_object_header_02) + QUICHE_BUG(QUICHE_BUG_serialize_object_header_02) << "Subgroup ID is not set on data stream"; return quiche::QuicheBuffer(); } @@ -365,12 +349,12 @@ quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram( const MoqtObject& message, absl::string_view payload) { if (!ValidateObjectMetadata(message, /*is_datagram=*/true)) { - QUIC_BUG(quic_bug_serialize_object_datagram_01) + QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); } if (message.payload_length != payload.length()) { - QUIC_BUG(quic_bug_serialize_object_datagram_03) + QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_03) << "Payload length does not match payload"; return quiche::QuicheBuffer(); } @@ -392,44 +376,34 @@ quiche::QuicheBuffer MoqtFramer::SerializeClientSetup( const MoqtClientSetup& message) { - absl::InlinedVector<IntParameter, 1> int_parameters; - absl::InlinedVector<StringParameter, 1> string_parameters; - if (message.max_subscribe_id.has_value()) { - int_parameters.push_back(IntParameter(MoqtSetupParameter::kMaxSubscribeId, - *message.max_subscribe_id)); - } - if (message.supports_object_ack) { - int_parameters.push_back( - IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u)); - } - if (!using_webtrans_ && message.path.has_value()) { - string_parameters.push_back( - StringParameter(MoqtSetupParameter::kPath, *message.path)); + KeyValuePairList parameters; + SessionParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateSetupParameters(parameters, using_webtrans_, + quic::Perspective::IS_SERVER)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); } return SerializeControlMessage( MoqtMessageType::kClientSetup, WireVarInt62(message.supported_versions.size()), WireSpan<WireVarInt62, MoqtVersion>(message.supported_versions), - WireVarInt62(string_parameters.size() + int_parameters.size()), - WireSpan<WireIntParameter>(int_parameters), - WireSpan<WireStringParameter>(string_parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeServerSetup( const MoqtServerSetup& message) { - absl::InlinedVector<IntParameter, 1> int_parameters; - if (message.max_subscribe_id.has_value()) { - int_parameters.push_back(IntParameter(MoqtSetupParameter::kMaxSubscribeId, - *message.max_subscribe_id)); - } - if (message.supports_object_ack) { - int_parameters.push_back( - IntParameter(MoqtSetupParameter::kSupportObjectAcks, 1u)); + KeyValuePairList parameters; + SessionParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateSetupParameters(parameters, using_webtrans_, + quic::Perspective::IS_CLIENT)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); } return SerializeControlMessage(MoqtMessageType::kServerSetup, WireVarInt62(message.selected_version), - WireVarInt62(int_parameters.size()), - WireSpan<WireIntParameter>(int_parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribe( @@ -439,6 +413,14 @@ QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range"; return quiche::QuicheBuffer(); } + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribe)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } switch (filter_type) { case MoqtFilterType::kLatestObject: return SerializeControlMessage( @@ -447,7 +429,7 @@ WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); case MoqtFilterType::kAbsoluteStart: return SerializeControlMessage( MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id), @@ -457,7 +439,7 @@ WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), WireVarInt62(message.start->group), WireVarInt62(message.start->object), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); case MoqtFilterType::kAbsoluteRange: return SerializeControlMessage( MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id), @@ -467,7 +449,7 @@ WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), WireVarInt62(message.start->group), WireVarInt62(message.start->object), WireVarInt62(*message.end_group), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); default: QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error."; return quiche::QuicheBuffer(); @@ -476,9 +458,13 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeOk( const MoqtSubscribeOk& message) { - if (message.parameters.authorization_info.has_value()) { - QUICHE_BUG(MoqtFramer_invalid_subscribe_ok) - << "SUBSCRIBE_OK with delivery timeout"; + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribeOk)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); } if (message.largest_id.has_value()) { return SerializeControlMessage( @@ -487,13 +473,13 @@ WireDeliveryOrder(message.group_order), WireUint8(1), WireVarInt62(message.largest_id->group), WireVarInt62(message.largest_id->object), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } return SerializeControlMessage( MoqtMessageType::kSubscribeOk, WireVarInt62(message.subscribe_id), WireVarInt62(message.expires.ToMilliseconds()), WireDeliveryOrder(message.group_order), WireUint8(0), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribeError( @@ -521,9 +507,13 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate( const MoqtSubscribeUpdate& message) { - if (message.parameters.authorization_info.has_value()) { - QUICHE_BUG(MoqtFramer_invalid_subscribe_update) - << "SUBSCRIBE_UPDATE with authorization info"; + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribeUpdate)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); } uint64_t end_group = message.end_group.has_value() ? *message.end_group + 1 : 0; @@ -531,18 +521,23 @@ MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.subscribe_id), WireVarInt62(message.start.group), WireVarInt62(message.start.object), WireVarInt62(end_group), WireUint8(message.subscriber_priority), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounce( const MoqtAnnounce& message) { - if (message.parameters.delivery_timeout.has_value()) { - QUICHE_BUG(MoqtFramer_invalid_announce) << "ANNOUNCE with delivery timeout"; + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kAnnounce)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); } return SerializeControlMessage( MoqtMessageType::kAnnounce, WireFullTrackName(message.track_namespace, false), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounceOk( @@ -572,9 +567,18 @@ quiche::QuicheBuffer MoqtFramer::SerializeTrackStatusRequest( const MoqtTrackStatusRequest& message) { + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters( + parameters, MoqtMessageType::kTrackStatusRequest)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } return SerializeControlMessage( MoqtMessageType::kTrackStatusRequest, - WireFullTrackName(message.full_track_name, true)); + WireFullTrackName(message.full_track_name, true), + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeUnannounce( @@ -586,11 +590,19 @@ quiche::QuicheBuffer MoqtFramer::SerializeTrackStatus( const MoqtTrackStatus& message) { + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kTrackStatus)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } return SerializeControlMessage( MoqtMessageType::kTrackStatus, WireFullTrackName(message.full_track_name, true), WireVarInt62(message.status_code), WireVarInt62(message.last_group), - WireVarInt62(message.last_object)); + WireVarInt62(message.last_object), WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeGoAway(const MoqtGoAway& message) { @@ -601,10 +613,18 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeAnnounces( const MoqtSubscribeAnnounces& message) { + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters( + parameters, MoqtMessageType::kSubscribeAnnounces)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } return SerializeControlMessage( MoqtMessageType::kSubscribeAnnounces, WireFullTrackName(message.track_namespace, false), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribeAnnouncesOk( @@ -645,6 +665,13 @@ QUICHE_BUG(MoqtFramer_invalid_fetch) << "Invalid FETCH object range"; return quiche::QuicheBuffer(); } + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } if (message.joining_fetch.has_value()) { return SerializeControlMessage( MoqtMessageType::kFetch, WireVarInt62(message.fetch_id), @@ -653,7 +680,7 @@ WireVarInt62(FetchType::kJoining), WireVarInt62(message.joining_fetch->joining_subscribe_id), WireVarInt62(message.joining_fetch->preceding_group_offset), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } return SerializeControlMessage( MoqtMessageType::kFetch, WireVarInt62(message.fetch_id), @@ -666,7 +693,7 @@ WireVarInt62(message.end_group), WireVarInt62(message.end_object.has_value() ? *message.end_object + 1 : 0), - WireSubscribeParameterList(message.parameters)); + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel( @@ -676,12 +703,20 @@ } quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) { - return SerializeControlMessage( - MoqtMessageType::kFetchOk, WireVarInt62(message.subscribe_id), - WireDeliveryOrder(message.group_order), - WireVarInt62(message.largest_id.group), - WireVarInt62(message.largest_id.object), - WireSubscribeParameterList(message.parameters)); + KeyValuePairList parameters; + VersionSpecificParametersToKeyValuePairList(message.parameters, parameters); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kFetchOk)) { + QUICHE_BUG(QUICHE_BUG_invalid_parameters) + << "Serializing invalid MoQT parameters"; + return quiche::QuicheBuffer(); + } + return SerializeControlMessage(MoqtMessageType::kFetchOk, + WireVarInt62(message.subscribe_id), + WireDeliveryOrder(message.group_order), + WireVarInt62(message.largest_id.group), + WireVarInt62(message.largest_id.object), + WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 9895416..690418b 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -415,8 +415,7 @@ /*group_order=*/std::nullopt, start, end_group, - MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, - std::nullopt}, + VersionSpecificParameters("bar"), }; quiche::QuicheBuffer buffer; MoqtFilterType expected_filter_type = GetFilterType(subscribe); @@ -444,7 +443,7 @@ /*group_order=*/std::nullopt, /*start=*/Location(4, 3), /*end_group=*/3, - MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters("bar"), }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe), @@ -463,7 +462,7 @@ /*end_group=*/1, /*end_object=*/1, /*parameters=*/ - MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters("baz"), }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch), @@ -482,8 +481,7 @@ /*start=*/Location(4, 3), /*end_group=*/4, /*subscriber_priority=*/0xaa, - MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt, - std::nullopt}, + VersionSpecificParameters(), }; quiche::QuicheBuffer buffer; buffer = framer_.SerializeSubscribeUpdate(subscribe_update); @@ -498,8 +496,7 @@ /*start=*/Location(4, 3), /*end_group=*/4, /*subscriber_priority=*/0xaa, - MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt, - std::nullopt}, + VersionSpecificParameters(), }; quiche::QuicheBuffer buffer; buffer = framer_.SerializeSubscribeUpdate(subscribe_update);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 96b36f3..73a67d6 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -78,7 +78,7 @@ std::optional<absl::string_view>())) .WillOnce([&]() { received_ok = true; }); client_->session()->SubscribeCurrentObject(track_name, visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; }); EXPECT_TRUE(success); @@ -144,7 +144,8 @@ std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; client_->session()->Announce(FullTrackName{"foo"}, - announce_callback.AsStdFunction()); + announce_callback.AsStdFunction(), + VersionSpecificParameters()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) .WillOnce([&](FullTrackName track_namespace, @@ -179,7 +180,8 @@ std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; client_->session()->Announce(FullTrackName{"foo"}, - announce_callback.AsStdFunction()); + announce_callback.AsStdFunction(), + VersionSpecificParameters()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) .WillOnce([&](FullTrackName track_namespace, @@ -219,7 +221,8 @@ std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; client_->session()->Announce(FullTrackName{"foo"}, - announce_callback.AsStdFunction()); + announce_callback.AsStdFunction(), + VersionSpecificParameters()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) .WillOnce([&](FullTrackName track_namespace, @@ -229,7 +232,7 @@ track_name.AddElement("/catalog"); EXPECT_FALSE(error.has_value()); server_->session()->SubscribeCurrentObject(track_name, &server_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); }); EXPECT_CALL(server_visitor, OnReply(_, _, _)).WillOnce([&]() { matches = true; @@ -253,7 +256,7 @@ track_name.AddElement("data"); server_->session()->SubscribeAbsolute( track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); return std::optional<MoqtAnnounceErrorReason>(); }); @@ -268,7 +271,8 @@ }); client_->session()->Announce( FullTrackName{"test"}, - [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {}); + [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {}, + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received_subscribe_ok; }); EXPECT_TRUE(success); @@ -316,7 +320,7 @@ queue->AddObject(MemSliceFromString("object 3"), /*key=*/false); client_->session()->SubscribeCurrentObject(FullTrackName("test", name), &client_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); std::optional<Location> largest_id; EXPECT_CALL(client_visitor, OnReply) .WillOnce([&](const FullTrackName& /*name*/, std::optional<Location> id, @@ -405,7 +409,7 @@ full_track_name, [&](std::unique_ptr<MoqtFetchTask> task) { fetch = std::move(task); }, Location{0, 0}, 99, std::nullopt, 128, std::nullopt, - MoqtSubscribeParameters())); + VersionSpecificParameters())); // Run until we get FETCH_OK. bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return fetch != nullptr; }); @@ -444,7 +448,8 @@ std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; client_->session()->Announce(FullTrackName{"foo"}, - announce_callback.AsStdFunction()); + announce_callback.AsStdFunction(), + VersionSpecificParameters()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) .WillOnce([&](FullTrackName track_namespace, @@ -479,7 +484,7 @@ EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); client_->session()->SubscribeAbsolute(full_track_name, 0, 0, &client_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; }); EXPECT_TRUE(success); @@ -505,7 +510,7 @@ EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; }); EXPECT_TRUE(success); @@ -531,7 +536,7 @@ EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; }); EXPECT_TRUE(success); @@ -546,7 +551,7 @@ EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return received_ok; }); EXPECT_TRUE(success); @@ -579,7 +584,7 @@ // Reject this subscribe because there already is one. EXPECT_FALSE(client_->session()->SubscribeCurrentObject( - full_track_name, &client_visitor, MoqtSubscribeParameters())); + full_track_name, &client_visitor, VersionSpecificParameters())); queue->RemoveAllSubscriptions(); // Induce a SUBSCRIBE_DONE. bool subscribe_done = false; EXPECT_CALL(client_visitor, OnSubscribeDone).WillOnce([&]() { @@ -591,7 +596,7 @@ // Subscription is deleted; the client session should not immediately reject // a new attempt. EXPECT_TRUE(client_->session()->SubscribeCurrentObject( - full_track_name, &client_visitor, MoqtSubscribeParameters())); + full_track_name, &client_visitor, VersionSpecificParameters())); } TEST_F(MoqtIntegrationTest, ObjectAcks) { @@ -629,8 +634,8 @@ ack_function(100, 200, quic::QuicTimeDelta::FromMicroseconds(456)); }); - MoqtSubscribeParameters parameters; - parameters.object_ack_window = quic::QuicTimeDelta::FromMilliseconds(100); + VersionSpecificParameters parameters; + parameters.oack_window_size = quic::QuicTimeDelta::FromMilliseconds(100); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor, parameters); EXPECT_CALL(monitoring, OnObjectAckSupportKnown(true)); @@ -665,7 +670,7 @@ bool received_ok = false; EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; // Set delivery timeout to ~ 1 RTT: any loss is fatal. parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor, @@ -715,7 +720,7 @@ bool received_ok = false; EXPECT_CALL(client_visitor, OnReply(full_track_name, _, expected_reason)) .WillOnce([&]() { received_ok = true; }); - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; // Set delivery timeout to ~ 1 RTT: any loss is fatal. parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100); client_->session()->SubscribeCurrentObject(full_track_name, &client_visitor,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index f2e9f7e..eba04a3 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -4,23 +4,86 @@ #include "quiche/quic/moqt/moqt_messages.h" +#include <array> +#include <cstddef> #include <cstdint> +#include <optional> #include <string> +#include <utility> #include <vector> #include "absl/algorithm/container.h" +#include "absl/container/btree_map.h" #include "absl/status/status.h" #include "absl/strings/escaping.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/web_transport/web_transport.h" namespace moqt { +void KeyValuePairList::insert(uint64_t key, absl::string_view value) { + if (key % 2 == 0) { + QUICHE_BUG(key_value_pair_string_is_even) << "Key value pair of wrong type"; + return; + } + string_map_.emplace(key, value); +} + +void KeyValuePairList::insert(uint64_t key, uint64_t value) { + if (key % 2 == 1) { + QUICHE_BUG(key_value_pair_int_is_odd) << "Key value pair of wrong type"; + return; + } + integer_map_.emplace(key, value); +} + +size_t KeyValuePairList::count(uint64_t key) const { + if (key % 2 == 0) { + return integer_map_.count(key); + } + return string_map_.count(key); +} + +bool KeyValuePairList::contains(uint64_t key) const { + if (key % 2 == 0) { + return integer_map_.contains(key); + } + return string_map_.contains(key); +} + +std::vector<uint64_t> KeyValuePairList::GetIntegers(uint64_t key) const { + if (key % 2 == 1) { + QUICHE_BUG(key_value_pair_int_is_odd) << "Key value pair of wrong type"; + return {}; + } + std::vector<uint64_t> result; + auto [range_start, range_end] = integer_map_.equal_range(key); + for (auto& it = range_start; it != range_end; ++it) { + result.push_back(it->second); + } + return result; +} + +std::vector<absl::string_view> KeyValuePairList::GetStrings( + uint64_t key) const { + if (key % 2 == 0) { + QUICHE_BUG(key_value_pair_string_is_even) << "Key value pair of wrong type"; + return {}; + } + std::vector<absl::string_view> result; + auto [range_start, range_end] = string_map_.equal_range(key); + for (auto& it = range_start; it != range_end; ++it) { + result.push_back(it->second); + } + return result; +} + MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) { if (integer >= static_cast<uint64_t>(MoqtObjectStatus::kInvalidObjectStatus)) { @@ -45,6 +108,71 @@ return MoqtFilterType::kLatestObject; } +bool ValidateSetupParameters(const KeyValuePairList& parameters, bool webtrans, + quic::Perspective perspective) { + if (parameters.count(SetupParameter::kPath) > 1 || + parameters.count(SetupParameter::kMaxRequestId) > 1 || + parameters.count(SetupParameter::kMaxAuthTokenCacheSize) > 1 || + parameters.count(SetupParameter::kSupportObjectAcks) > 1) { + return false; + } + if ((webtrans || perspective == quic::Perspective::IS_CLIENT) == + parameters.contains(SetupParameter::kPath)) { + // Only non-webtrans servers should receive kPath. + return false; + } + if (!parameters.contains(SetupParameter::kSupportObjectAcks)) { + return true; + } + std::vector<uint64_t> support_object_acks = + parameters.GetIntegers(SetupParameter::kSupportObjectAcks); + QUICHE_DCHECK(support_object_acks.size() == 1); + if (support_object_acks.front() > 1) { + return false; + } + return true; +} + +const std::array<MoqtMessageType, 5> kAllowsAuthorization = { + MoqtMessageType::kSubscribe, MoqtMessageType::kTrackStatusRequest, + MoqtMessageType::kFetch, MoqtMessageType::kSubscribeAnnounces, + MoqtMessageType::kAnnounce}; +const std::array<MoqtMessageType, 4> kAllowsDeliveryTimeout = { + MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, + MoqtMessageType::kSubscribeUpdate, MoqtMessageType::kTrackStatus}; +const std::array<MoqtMessageType, 3> kAllowsMaxCacheDuration = { + MoqtMessageType::kSubscribeOk, MoqtMessageType::kTrackStatus, + MoqtMessageType::kFetchOk}; +bool ValidateVersionSpecificParameters(const KeyValuePairList& parameters, + MoqtMessageType message_type) { + size_t authorization_token = + parameters.count(VersionSpecificParameter::kAuthorizationToken); + size_t delivery_timeout = + parameters.count(VersionSpecificParameter::kDeliveryTimeout); + size_t authorization_info = + parameters.count(VersionSpecificParameter::kAuthorizationInfo); + size_t max_cache_duration = + parameters.count(VersionSpecificParameter::kMaxCacheDuration); + if (delivery_timeout > 1 || authorization_info > 1 || + max_cache_duration > 1) { + // Disallowed duplicate. + return false; + } + if ((authorization_token > 0 || authorization_info > 0) && + !absl::c_linear_search(kAllowsAuthorization, message_type)) { + return false; + } + if (delivery_timeout > 0 && + !absl::c_linear_search(kAllowsDeliveryTimeout, message_type)) { + return false; + } + if (max_cache_duration > 0 && + !absl::c_linear_search(kAllowsMaxCacheDuration, message_type)) { + return false; + } + return true; +} + std::string MoqtMessageTypeToString(const MoqtMessageType message_type) { switch (message_type) { case MoqtMessageType::kClientSetup: @@ -52,7 +180,7 @@ case MoqtMessageType::kServerSetup: return "SERVER_SETUP"; case MoqtMessageType::kSubscribe: - return "SUBSCRIBE_REQUEST"; + return "SUBSCRIBE"; case MoqtMessageType::kSubscribeOk: return "SUBSCRIBE_OK"; case MoqtMessageType::kSubscribeError:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index cbcd4d6..41809c7 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -13,9 +13,9 @@ #include <optional> #include <string> #include <utility> -#include <variant> #include <vector> +#include "absl/container/btree_map.h" #include "absl/container/inlined_vector.h" #include "absl/status/status.h" #include "absl/strings/str_format.h" @@ -27,6 +27,7 @@ #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" #include "quiche/common/platform/api/quiche_export.h" +#include "quiche/common/quiche_callbacks.h" #include "quiche/web_transport/web_transport.h" namespace moqt { @@ -36,32 +37,52 @@ } enum class MoqtVersion : uint64_t { - kDraft10 = 0xff00000a, + kDraft11 = 0xff00000b, kUnrecognizedVersionForTests = 0xfe0000ff, }; -inline constexpr MoqtVersion kDefaultMoqtVersion = MoqtVersion::kDraft10; +inline constexpr MoqtVersion kDefaultMoqtVersion = MoqtVersion::kDraft11; inline constexpr uint64_t kDefaultInitialMaxSubscribeId = 100; +// TODO(martinduke): Implement an auth token cache. +inline constexpr uint64_t kDefaultMaxAuthTokenCacheSize = 0; inline constexpr uint64_t kMinNamespaceElements = 1; inline constexpr uint64_t kMaxNamespaceElements = 32; struct QUICHE_EXPORT MoqtSessionParameters { // TODO: support multiple versions. - // TODO: support roles other than PubSub. - + MoqtSessionParameters() = default; explicit MoqtSessionParameters(quic::Perspective perspective) : perspective(perspective), using_webtrans(true) {} MoqtSessionParameters(quic::Perspective perspective, std::string path) : perspective(perspective), using_webtrans(false), path(std::move(path)) {} + MoqtSessionParameters(quic::Perspective perspective, std::string path, + uint64_t max_subscribe_id) + : perspective(perspective), + using_webtrans(true), + path(std::move(path)), + max_subscribe_id(max_subscribe_id) {} + MoqtSessionParameters(quic::Perspective perspective, + uint64_t max_subscribe_id) + : perspective(perspective), max_subscribe_id(max_subscribe_id) {} + bool operator==(const MoqtSessionParameters& other) { + return version == other.version && + deliver_partial_objects == other.deliver_partial_objects && + perspective == other.perspective && + using_webtrans == other.using_webtrans && path == other.path && + max_subscribe_id == other.max_subscribe_id && + max_auth_token_cache_size == other.max_auth_token_cache_size && + support_object_acks == other.support_object_acks; + } MoqtVersion version = kDefaultMoqtVersion; - quic::Perspective perspective; - bool using_webtrans; - std::string path; - uint64_t max_subscribe_id = kDefaultInitialMaxSubscribeId; bool deliver_partial_objects = false; + quic::Perspective perspective = quic::Perspective::IS_SERVER; + bool using_webtrans = true; + std::string path = ""; + uint64_t max_subscribe_id = kDefaultInitialMaxSubscribeId; + uint64_t max_auth_token_cache_size = kDefaultMaxAuthTokenCacheSize; bool support_object_acks = false; }; @@ -108,8 +129,8 @@ kFetchOk = 0x18, kFetchError = 0x19, kSubscribesBlocked = 0x1a, - kClientSetup = 0x40, - kServerSetup = 0x41, + kClientSetup = 0x20, + kServerSetup = 0x21, // QUICHE-specific extensions. @@ -139,23 +160,54 @@ 0x01; inline constexpr webtransport::StreamErrorCode kResetCodeTimedOut = 0x02; -enum class QUICHE_EXPORT MoqtSetupParameter : uint64_t { - kRole = 0x0, +enum class QUICHE_EXPORT SetupParameter : uint64_t { kPath = 0x1, - kMaxSubscribeId = 0x2, + kMaxRequestId = 0x2, + kMaxAuthTokenCacheSize = 0x4, // QUICHE-specific extensions. // Indicates support for OACK messages. - kSupportObjectAcks = 0xbbf1439, + kSupportObjectAcks = 0xbbf1438, }; -enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t { - kAuthorizationInfo = 0x2, - kDeliveryTimeout = 0x3, +enum class QUICHE_EXPORT VersionSpecificParameter : uint64_t { + kAuthorizationToken = 0x1, + kDeliveryTimeout = 0x2, + kAuthorizationInfo = 0x3, kMaxCacheDuration = 0x4, // QUICHE-specific extensions. - kOackWindowSize = 0xbbf1439, + kOackWindowSize = 0xbbf1438, +}; + +struct VersionSpecificParameters { + VersionSpecificParameters() = default; + // Likely parameter combinations. + VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout, + quic::QuicTimeDelta max_cache_duration) + : delivery_timeout(delivery_timeout), + max_cache_duration(max_cache_duration) {} + explicit VersionSpecificParameters(std::string authorization_info) + : authorization_info(authorization_info) {} + VersionSpecificParameters(quic::QuicTimeDelta delivery_timeout, + std::string authorization_info) + : delivery_timeout(delivery_timeout), + authorization_info(authorization_info) {} + + // TODO(martinduke): Turn auth_token into structured data. + std::vector<std::string> authorization_token; + quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite(); + std::optional<std::string> authorization_info; + quic::QuicTimeDelta max_cache_duration = quic::QuicTimeDelta::Infinite(); + std::optional<quic::QuicTimeDelta> oack_window_size; + + bool operator==(const VersionSpecificParameters& other) const { + return authorization_token == other.authorization_token && + delivery_timeout == other.delivery_timeout && + authorization_info == other.authorization_info && + max_cache_duration == other.max_cache_duration && + oack_window_size == other.oack_window_size; + } }; // Used for SUBSCRIBE_ERROR, ANNOUNCE_ERROR, ANNOUNCE_CANCEL, @@ -306,17 +358,97 @@ return H::combine(std::move(h), m.group, m.object); } +// Encodes a list of key-value pairs common to both parameters and extensions. +// If the key is odd, it is a length-prefixed string (which may encode further +// item-specific structure). If the key is even, it is a varint. +// This class does not interpret the semantic meaning of the keys and values, +// although it does accept various uint64_t-based enums to reduce the burden of +// casting on the caller. +class KeyValuePairList { + public: + KeyValuePairList() = default; + size_t size() const { return integer_map_.size() + string_map_.size(); } + void insert(VersionSpecificParameter key, uint64_t value) { + insert(static_cast<uint64_t>(key), value); + } + void insert(SetupParameter key, uint64_t value) { + insert(static_cast<uint64_t>(key), value); + } + void insert(VersionSpecificParameter key, absl::string_view value) { + insert(static_cast<uint64_t>(key), value); + } + void insert(SetupParameter key, absl::string_view value) { + insert(static_cast<uint64_t>(key), value); + } + void insert(uint64_t key, absl::string_view value); + void insert(uint64_t key, uint64_t value); + size_t count(VersionSpecificParameter key) const { + return count(static_cast<uint64_t>(key)); + } + size_t count(SetupParameter key) const { + return count(static_cast<uint64_t>(key)); + } + bool contains(VersionSpecificParameter key) const { + return contains(static_cast<uint64_t>(key)); + } + bool contains(SetupParameter key) const { + return contains(static_cast<uint64_t>(key)); + } + // If either of these callbacks returns false, ForEach will return early. + using IntCallback = quiche::UnretainedCallback<bool(uint64_t, uint64_t)>; + using StringCallback = + quiche::UnretainedCallback<bool(uint64_t, absl::string_view)>; + // Iterates through the whole list, and executes int_callback for each integer + // value and string_callback for each string value. + bool ForEach(IntCallback int_callback, StringCallback string_callback) const { + for (const auto& [key, value] : integer_map_) { + if (!int_callback(key, value)) { + return false; + } + } + for (const auto& [key, value] : string_map_) { + if (!string_callback(key, value)) { + return false; + } + } + return true; + } + std::vector<uint64_t> GetIntegers(VersionSpecificParameter key) const { + return GetIntegers(static_cast<uint64_t>(key)); + } + std::vector<uint64_t> GetIntegers(SetupParameter key) const { + return GetIntegers(static_cast<uint64_t>(key)); + } + std::vector<absl::string_view> GetStrings( + VersionSpecificParameter key) const { + return GetStrings(static_cast<uint64_t>(key)); + } + std::vector<absl::string_view> GetStrings(SetupParameter key) const { + return GetStrings(static_cast<uint64_t>(key)); + } + void clear() { + integer_map_.clear(); + string_map_.clear(); + } + + private: + size_t count(uint64_t key) const; + bool contains(uint64_t key) const; + std::vector<uint64_t> GetIntegers(uint64_t key) const; + std::vector<absl::string_view> GetStrings(uint64_t key) const; + absl::btree_multimap<uint64_t, uint64_t> integer_map_; + absl::btree_multimap<uint64_t, std::string> string_map_; +}; + +// TODO(martinduke): Collapse both Setup messages into MoqtSessionParameters. struct QUICHE_EXPORT MoqtClientSetup { std::vector<MoqtVersion> supported_versions; - std::optional<std::string> path; - std::optional<uint64_t> max_subscribe_id; - bool supports_object_ack = false; + MoqtSessionParameters parameters; }; struct QUICHE_EXPORT MoqtServerSetup { MoqtVersion selected_version; - std::optional<uint64_t> max_subscribe_id; - bool supports_object_ack = false; + MoqtSessionParameters parameters; }; // These codes do not appear on the wire. @@ -357,25 +489,6 @@ kAbsoluteRange = 0x4, }; -struct QUICHE_EXPORT MoqtSubscribeParameters { - std::optional<std::string> authorization_info; - std::optional<quic::QuicTimeDelta> delivery_timeout; - std::optional<quic::QuicTimeDelta> max_cache_duration; - - // If present, indicates that OBJECT_ACK messages will be sent in response to - // the objects on the stream. The actual value is informational, and it - // communicates how many frames the subscriber is willing to buffer, in - // microseconds. - std::optional<quic::QuicTimeDelta> object_ack_window; - - bool operator==(const MoqtSubscribeParameters& other) const { - return authorization_info == other.authorization_info && - delivery_timeout == other.delivery_timeout && - max_cache_duration == other.max_cache_duration && - object_ack_window == other.object_ack_window; - } -}; - struct QUICHE_EXPORT MoqtSubscribe { uint64_t subscribe_id; uint64_t track_alias; @@ -391,8 +504,7 @@ std::optional<Location> start; std::optional<uint64_t> end_group; // If the mode is kNone, the these are std::nullopt. - - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; // Deduce the filter type from the combination of group and object IDs. Returns @@ -406,7 +518,7 @@ MoqtDeliveryOrder group_order; // If ContextExists on the wire is zero, largest_id has no value. std::optional<Location> largest_id; - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtSubscribeError { @@ -442,12 +554,12 @@ Location start; std::optional<uint64_t> end_group; MoqtPriority subscriber_priority; - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtAnnounce { FullTrackName track_namespace; - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtAnnounceOk { @@ -490,6 +602,7 @@ MoqtTrackStatusCode status_code; uint64_t last_group; uint64_t last_object; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtAnnounceCancel { @@ -500,6 +613,7 @@ struct QUICHE_EXPORT MoqtTrackStatusRequest { FullTrackName full_track_name; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtGoAway { @@ -508,7 +622,7 @@ struct QUICHE_EXPORT MoqtSubscribeAnnounces { FullTrackName track_namespace; - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtSubscribeAnnouncesOk { @@ -553,7 +667,7 @@ Location start_object; // subgroup is ignored uint64_t end_group; std::optional<uint64_t> end_object; - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtFetchCancel { @@ -564,7 +678,7 @@ uint64_t subscribe_id; MoqtDeliveryOrder group_order; Location largest_id; // subgroup is ignored - MoqtSubscribeParameters parameters; + VersionSpecificParameters parameters; }; struct QUICHE_EXPORT MoqtFetchError { @@ -588,6 +702,16 @@ quic::QuicTimeDelta delta_from_deadline = quic::QuicTimeDelta::Zero(); }; +// Returns false if duplicates are present for a known parameter where the spec +// forbids duplicates. |perspective| is the consumer of the message, not the +// sender. +bool ValidateSetupParameters(const KeyValuePairList& parameters, bool webtrans, + quic::Perspective perspective); +// Returns false if the parameters contain a protocol violation, or a +// parameter cannot be in |message type|. +bool ValidateVersionSpecificParameters(const KeyValuePairList& parameters, + MoqtMessageType message_type); + std::string MoqtMessageTypeToString(MoqtMessageType message_type); std::string MoqtDataStreamTypeToString(MoqtDataStreamType type); std::string MoqtDatagramTypeToString(MoqtDatagramType type);
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 1dbe07e..390ceec 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -20,6 +20,7 @@ #include "absl/types/span.h" #include "quiche/quic/core/quic_data_reader.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" @@ -101,6 +102,108 @@ return result; } +// Reads from |reader| to list. Returns false if there is a read error. +bool ParseKeyValuePairList(quic::QuicDataReader& reader, + KeyValuePairList& list) { + list.clear(); + uint64_t num_params; + if (!reader.ReadVarInt62(&num_params)) { + return false; + } + for (uint64_t i = 0; i < num_params; ++i) { + uint64_t type; + if (!reader.ReadVarInt62(&type)) { + return false; + } + if (type % 2 == 1) { + absl::string_view bytes; + if (!reader.ReadStringPieceVarInt62(&bytes)) { + return false; + } + list.insert(type, bytes); + continue; + } + uint64_t value; + if (!reader.ReadVarInt62(&value)) { + return false; + } + list.insert(type, value); + } + return true; +} + +void KeyValuePairListToMoqtSessionParameters(const KeyValuePairList& parameters, + MoqtSessionParameters& out) { + parameters.ForEach( + [&](uint64_t key, uint64_t value) { + SetupParameter parameter = static_cast<SetupParameter>(key); + switch (parameter) { + case SetupParameter::kMaxRequestId: + out.max_subscribe_id = value; + break; + case SetupParameter::kMaxAuthTokenCacheSize: + out.max_auth_token_cache_size = value; + break; + case SetupParameter::kSupportObjectAcks: + out.support_object_acks = (value == 1); + break; + default: + break; + } + return true; + }, + [&](uint64_t key, absl::string_view value) { + SetupParameter parameter = static_cast<SetupParameter>(key); + switch (parameter) { + case SetupParameter::kPath: + out.path = value; + break; + default: + break; + } + return true; + }); +} + +void KeyValuePairListToVersionSpecificParameters( + const KeyValuePairList& parameters, VersionSpecificParameters& out) { + parameters.ForEach( + [&](uint64_t key, uint64_t value) { + VersionSpecificParameter parameter = + static_cast<VersionSpecificParameter>(key); + switch (parameter) { + case VersionSpecificParameter::kDeliveryTimeout: + out.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(value); + break; + case VersionSpecificParameter::kMaxCacheDuration: + out.max_cache_duration = + quic::QuicTimeDelta::FromMilliseconds(value); + break; + case VersionSpecificParameter::kOackWindowSize: + out.oack_window_size = quic::QuicTimeDelta::FromMicroseconds(value); + break; + default: + break; + } + return true; + }, + [&](uint64_t key, absl::string_view value) { + VersionSpecificParameter parameter = + static_cast<VersionSpecificParameter>(key); + switch (parameter) { + case VersionSpecificParameter::kAuthorizationToken: + out.authorization_token.push_back(std::string(value)); + break; + case VersionSpecificParameter::kAuthorizationInfo: + out.authorization_info = value; + break; + default: + break; + } + return true; + }); +} + } // namespace void MoqtControlParser::ReadAndDispatchMessages() { @@ -275,6 +378,8 @@ size_t MoqtControlParser::ProcessClientSetup(quic::QuicDataReader& reader) { MoqtClientSetup setup; + setup.parameters.using_webtrans = uses_web_transport_; + setup.parameters.perspective = quic::Perspective::IS_CLIENT; uint64_t number_of_supported_versions; if (!reader.ReadVarInt62(&number_of_supported_versions)) { return 0; @@ -286,131 +391,59 @@ } setup.supported_versions.push_back(static_cast<MoqtVersion>(version)); } - uint64_t num_params; - if (!reader.ReadVarInt62(&num_params)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - // Parse parameters - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return 0; - } - auto key = static_cast<MoqtSetupParameter>(type); - switch (key) { - case MoqtSetupParameter::kPath: - if (uses_web_transport_) { - ParseError( - "WebTransport connection is using PATH parameter in SETUP"); - return 0; - } - if (setup.path.has_value()) { - ParseError("PATH parameter appears twice in CLIENT_SETUP"); - return 0; - } - setup.path = value; - break; - case MoqtSetupParameter::kMaxSubscribeId: - if (setup.max_subscribe_id.has_value()) { - ParseError("MAX_SUBSCRIBE_ID parameter appears twice in SETUP"); - return 0; - } - uint64_t max_id; - if (!StringViewToVarInt(value, max_id)) { - ParseError("MAX_SUBSCRIBE_ID parameter is not a valid varint"); - return 0; - } - setup.max_subscribe_id = max_id; - break; - case MoqtSetupParameter::kSupportObjectAcks: - uint64_t flag; - if (!StringViewToVarInt(value, flag) || flag > 1) { - ParseError("Invalid kSupportObjectAcks value"); - return 0; - } - setup.supports_object_ack = static_cast<bool>(flag); - break; - default: - // Skip over the parameter. - break; - } - } - if (!uses_web_transport_ && !setup.path.has_value()) { - ParseError("PATH SETUP parameter missing from Client message over QUIC"); + if (!ValidateSetupParameters(parameters, uses_web_transport_, + quic::Perspective::IS_SERVER)) { + ParseError("Client SETUP contains invalid parameters"); return 0; } + KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters); + // TODO(martinduke): Validate construction of the PATH (Sec 8.3.2.1) visitor_.OnClientSetupMessage(setup); return reader.PreviouslyReadPayload().length(); } size_t MoqtControlParser::ProcessServerSetup(quic::QuicDataReader& reader) { MoqtServerSetup setup; + setup.parameters.using_webtrans = uses_web_transport_; + setup.parameters.perspective = quic::Perspective::IS_SERVER; uint64_t version; if (!reader.ReadVarInt62(&version)) { return 0; } setup.selected_version = static_cast<MoqtVersion>(version); - uint64_t num_params; - if (!reader.ReadVarInt62(&num_params)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - // Parse parameters - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return 0; - } - auto key = static_cast<MoqtSetupParameter>(type); - switch (key) { - case MoqtSetupParameter::kPath: - ParseError("PATH parameter in SERVER_SETUP"); - return 0; - case MoqtSetupParameter::kMaxSubscribeId: - if (setup.max_subscribe_id.has_value()) { - ParseError("MAX_SUBSCRIBE_ID parameter appears twice in SETUP"); - return 0; - } - uint64_t max_id; - if (!StringViewToVarInt(value, max_id)) { - ParseError("MAX_SUBSCRIBE_ID parameter is not a valid varint"); - return 0; - } - setup.max_subscribe_id = max_id; - break; - case MoqtSetupParameter::kSupportObjectAcks: - uint64_t flag; - if (!StringViewToVarInt(value, flag) || flag > 1) { - ParseError("Invalid kSupportObjectAcks value"); - return 0; - } - setup.supports_object_ack = static_cast<bool>(flag); - break; - default: - // Skip over the parameter. - break; - } + if (!ValidateSetupParameters(parameters, uses_web_transport_, + quic::Perspective::IS_CLIENT)) { + ParseError("Server SETUP contains invalid parameters"); + return 0; } + KeyValuePairListToMoqtSessionParameters(parameters, setup.parameters); visitor_.OnServerSetupMessage(setup); return reader.PreviouslyReadPayload().length(); } size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) { - MoqtSubscribe subscribe_request; + MoqtSubscribe subscribe; uint64_t filter, group, object; uint8_t group_order; absl::string_view track_name; - if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) || - !reader.ReadVarInt62(&subscribe_request.track_alias) || - !ReadTrackNamespace(reader, subscribe_request.full_track_name) || + if (!reader.ReadVarInt62(&subscribe.subscribe_id) || + !reader.ReadVarInt62(&subscribe.track_alias) || + !ReadTrackNamespace(reader, subscribe.full_track_name) || !reader.ReadStringPieceVarInt62(&track_name) || - !reader.ReadUInt8(&subscribe_request.subscriber_priority) || + !reader.ReadUInt8(&subscribe.subscriber_priority) || !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) { return 0; } - subscribe_request.full_track_name.AddElement(track_name); - if (!ParseDeliveryOrder(group_order, subscribe_request.group_order)) { + subscribe.full_track_name.AddElement(track_name); + if (!ParseDeliveryOrder(group_order, subscribe.group_order)) { ParseError("Invalid group order value in SUBSCRIBE message"); return 0; } @@ -423,15 +456,15 @@ if (!reader.ReadVarInt62(&group) || !reader.ReadVarInt62(&object)) { return 0; } - subscribe_request.start = Location(group, object); + subscribe.start = Location(group, object); if (filter_type == MoqtFilterType::kAbsoluteStart) { break; } if (!reader.ReadVarInt62(&group)) { return 0; } - subscribe_request.end_group = group; - if (*subscribe_request.end_group < subscribe_request.start->group) { + subscribe.end_group = group; + if (*subscribe.end_group < subscribe.start->group) { ParseError("End group is less than start group"); return 0; } @@ -440,10 +473,18 @@ ParseError("Invalid filter type"); return 0; } - if (!ReadSubscribeParameters(reader, subscribe_request.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - visitor_.OnSubscribeMessage(subscribe_request); + // TODO(martinduke): Parse kAuthorizationToken. + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribe)) { + ParseError("SUBSCRIBE contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, subscribe.parameters); + visitor_.OnSubscribeMessage(subscribe); return reader.PreviouslyReadPayload().length(); } @@ -474,13 +515,17 @@ return 0; } } - if (!ReadSubscribeParameters(reader, subscribe_ok.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - if (subscribe_ok.parameters.authorization_info.has_value()) { - ParseError("SUBSCRIBE_OK has authorization info"); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribeOk)) { + ParseError("SUBSCRIBE_OK contains invalid parameters"); return 0; } + KeyValuePairListToVersionSpecificParameters(parameters, + subscribe_ok.parameters); visitor_.OnSubscribeOkMessage(subscribe_ok); return reader.PreviouslyReadPayload().length(); } @@ -531,9 +576,17 @@ !reader.ReadUInt8(&subscribe_update.subscriber_priority)) { return 0; } - if (!ReadSubscribeParameters(reader, subscribe_update.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kSubscribeUpdate)) { + ParseError("SUBSCRIBE_UPDATE contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, + subscribe_update.parameters); subscribe_update.start = Location(start_group, start_object); if (end_group > 0) { subscribe_update.end_group = end_group - 1; @@ -542,10 +595,6 @@ return 0; } } - if (subscribe_update.parameters.authorization_info.has_value()) { - ParseError("SUBSCRIBE_UPDATE has authorization info"); - return 0; - } visitor_.OnSubscribeUpdateMessage(subscribe_update); return reader.PreviouslyReadPayload().length(); } @@ -555,13 +604,16 @@ if (!ReadTrackNamespace(reader, announce.track_namespace)) { return 0; } - if (!ReadSubscribeParameters(reader, announce.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - if (announce.parameters.delivery_timeout.has_value()) { - ParseError("ANNOUNCE has delivery timeout"); + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kAnnounce)) { + ParseError("ANNOUNCE contains invalid parameters"); return 0; } + KeyValuePairListToVersionSpecificParameters(parameters, announce.parameters); visitor_.OnAnnounceMessage(announce); return reader.PreviouslyReadPayload().length(); } @@ -616,6 +668,17 @@ return 0; } track_status_request.full_track_name.AddElement(name); + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { + return 0; + } + if (!ValidateVersionSpecificParameters( + parameters, MoqtMessageType::kTrackStatusRequest)) { + ParseError("TRACK_STATUS_REQUEST message contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, + track_status_request.parameters); visitor_.OnTrackStatusRequestMessage(track_status_request); return reader.PreviouslyReadPayload().length(); } @@ -646,6 +709,17 @@ return 0; } track_status.status_code = static_cast<MoqtTrackStatusCode>(value); + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { + return 0; + } + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kTrackStatus)) { + ParseError("TRACK_STATUS message contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, + track_status.parameters); visitor_.OnTrackStatusMessage(track_status); return reader.PreviouslyReadPayload().length(); } @@ -661,14 +735,22 @@ size_t MoqtControlParser::ProcessSubscribeAnnounces( quic::QuicDataReader& reader) { - MoqtSubscribeAnnounces subscribe_namespace; - if (!ReadTrackNamespace(reader, subscribe_namespace.track_namespace)) { + MoqtSubscribeAnnounces subscribe_announces; + if (!ReadTrackNamespace(reader, subscribe_announces.track_namespace)) { return 0; } - if (!ReadSubscribeParameters(reader, subscribe_namespace.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } - visitor_.OnSubscribeAnnouncesMessage(subscribe_namespace); + if (!ValidateVersionSpecificParameters( + parameters, MoqtMessageType::kSubscribeAnnounces)) { + ParseError("SUBSCRIBE_ANNOUNCES message contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, + subscribe_announces.parameters); + visitor_.OnSubscribeAnnouncesMessage(subscribe_announces); return reader.PreviouslyReadPayload().length(); } @@ -770,9 +852,15 @@ ParseError("Invalid FETCH type"); return 0; } - if (!ReadSubscribeParameters(reader, fetch.parameters)) { + KeyValuePairList parameters; + if (!ParseKeyValuePairList(reader, parameters)) { return 0; } + if (!ValidateVersionSpecificParameters(parameters, MoqtMessageType::kFetch)) { + ParseError("FETCH message contains invalid parameters"); + return 0; + } + KeyValuePairListToVersionSpecificParameters(parameters, fetch.parameters); visitor_.OnFetchMessage(fetch); return reader.PreviouslyReadPayload().length(); } @@ -789,18 +877,25 @@ size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) { MoqtFetchOk fetch_ok; uint8_t group_order; + KeyValuePairList parameters; if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) || !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&fetch_ok.largest_id.group) || !reader.ReadVarInt62(&fetch_ok.largest_id.object) || - !ReadSubscribeParameters(reader, fetch_ok.parameters)) { + !ParseKeyValuePairList(reader, parameters)) { return 0; } if (group_order != 0x01 && group_order != 0x02) { ParseError("Invalid group order value in FETCH_OK"); return 0; } + if (!ValidateVersionSpecificParameters(parameters, + MoqtMessageType::kFetchOk)) { + ParseError("FETCH_OK message contains invalid parameters"); + return 0; + } fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); + KeyValuePairListToVersionSpecificParameters(parameters, fetch_ok.parameters); visitor_.OnFetchOkMessage(fetch_ok); return reader.PreviouslyReadPayload().length(); } @@ -857,109 +952,6 @@ visitor_.OnParsingError(error_code, reason); } -bool MoqtControlParser::ReadVarIntPieceVarInt62(quic::QuicDataReader& reader, - uint64_t& result) { - uint64_t length; - if (!reader.ReadVarInt62(&length)) { - return false; - } - uint64_t actual_length = static_cast<uint64_t>(reader.PeekVarInt62Length()); - if (length != actual_length) { - ParseError("Parameter VarInt has length field mismatch"); - return false; - } - if (!reader.ReadVarInt62(&result)) { - return false; - } - return true; -} - -bool MoqtControlParser::ReadParameter(quic::QuicDataReader& reader, - uint64_t& type, - absl::string_view& value) { - if (!reader.ReadVarInt62(&type)) { - return false; - } - return reader.ReadStringPieceVarInt62(&value); -} - -bool MoqtControlParser::ReadSubscribeParameters( - quic::QuicDataReader& reader, MoqtSubscribeParameters& params) { - uint64_t num_params; - if (!reader.ReadVarInt62(&num_params)) { - return false; - } - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return false; - } - uint64_t raw_value; - auto key = static_cast<MoqtTrackRequestParameter>(type); - switch (key) { - case MoqtTrackRequestParameter::kAuthorizationInfo: - if (params.authorization_info.has_value()) { - ParseError("AUTHORIZATION_INFO parameter appears twice"); - return false; - } - params.authorization_info = value; - break; - case moqt::MoqtTrackRequestParameter::kDeliveryTimeout: - if (params.delivery_timeout.has_value()) { - ParseError("DELIVERY_TIMEOUT parameter appears twice"); - return false; - } - if (!StringViewToVarInt(value, raw_value)) { - return false; - } - params.delivery_timeout = - quic::QuicTimeDelta::FromMilliseconds(raw_value); - break; - case moqt::MoqtTrackRequestParameter::kMaxCacheDuration: - if (params.max_cache_duration.has_value()) { - ParseError("MAX_CACHE_DURATION parameter appears twice"); - return false; - } - if (!StringViewToVarInt(value, raw_value)) { - return false; - } - params.max_cache_duration = - quic::QuicTimeDelta::FromMilliseconds(raw_value); - break; - case MoqtTrackRequestParameter::kOackWindowSize: { - if (params.object_ack_window.has_value()) { - ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE"); - return false; - } - if (!StringViewToVarInt(value, raw_value)) { - ParseError("OACK_WINDOW_SIZE parameter is not a valid varint"); - return false; - } - params.object_ack_window = - quic::QuicTimeDelta::FromMicroseconds(raw_value); - break; - } - default: - // Skip over the parameter. - break; - } - } - return true; -} - -bool MoqtControlParser::StringViewToVarInt(absl::string_view& sv, - uint64_t& vi) { - quic::QuicDataReader reader(sv); - if (static_cast<size_t>(reader.PeekVarInt62Length()) != sv.length()) { - ParseError(MoqtError::kParameterLengthMismatch, - "Parameter length does not match varint encoding"); - return false; - } - reader.ReadVarInt62(&vi); - return true; -} - bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader, FullTrackName& full_track_name) { QUICHE_DCHECK(full_track_name.empty());
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 37c759e..137dd87 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -138,24 +138,6 @@ void ParseError(absl::string_view reason); void ParseError(MoqtError error, absl::string_view reason); - // Reads an integer whose length is specified by a preceding VarInt62 and - // returns it in |result|. Returns false if parsing fails. - bool ReadVarIntPieceVarInt62(quic::QuicDataReader& reader, uint64_t& result); - // Read a parameter and return the value as a string_view. Returns false if - // |reader| does not have enough data. - bool ReadParameter(quic::QuicDataReader& reader, uint64_t& type, - absl::string_view& value); - // Reads MoqtSubscribeParameter from one of the message types that supports - // it. The cursor in |reader| should point to the "number of parameters" - // field in the message. The cursor will move to the end of the parameters. - // Returns false if it could not parse the full message, in which case the - // cursor in |reader| should not be used. - bool ReadSubscribeParameters(quic::QuicDataReader& reader, - MoqtSubscribeParameters& params); - // Convert a string view to a varint. Throws an error and returns false if the - // string_view is not exactly the right length. - bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi); - // Parses a message that a track namespace but not name. The last element of // |full_track_name| will be set to the empty string. Returns false if it // could not parse the full namespace field.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index a2270db..6ba8a22 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -559,21 +559,21 @@ EXPECT_FALSE(visitor_.parsing_error_.has_value()); } -TEST_F(MoqtMessageSpecificTest, ClientSetupMaxSubscribeIdAppearsTwice) { +TEST_F(MoqtMessageSpecificTest, ClientSetupMaxRequestIdAppearsTwice) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char setup[] = { - 0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions - 0x03, // 3 params - 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" - 0x02, 0x01, 0x32, // max_subscribe_id = 50 - 0x02, 0x01, 0x32, // max_subscribe_id = 50 + 0x20, 0x0d, 0x02, 0x01, 0x02, // versions + 0x03, // 3 params + 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" + 0x02, 0x32, // max_request_id = 50 + 0x02, 0x32, // max_request_id = 50 }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "MAX_SUBSCRIBE_ID parameter appears twice in SETUP"); + "Client SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -581,7 +581,7 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char setup[] = { - 0x40, 0x41, 0x07, + 0x21, 0x07, 0x01, // version = 1 0x01, // 1 param 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" @@ -589,7 +589,8 @@ stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "PATH parameter in SERVER_SETUP"); + EXPECT_EQ(visitor_.parsing_error_, + "Server SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -597,16 +598,16 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char setup[] = { - 0x40, 0x40, 0x0e, 0x02, 0x01, 0x02, // versions = 1, 2 - 0x02, // 2 params - 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" - 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" + 0x20, 0x0e, 0x02, 0x01, 0x02, // versions = 1, 2 + 0x02, // 2 params + 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "PATH parameter appears twice in CLIENT_SETUP"); + "Client SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -614,15 +615,15 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); char setup[] = { - 0x40, 0x40, 0x09, 0x02, 0x01, 0x02, // versions = 1, 2 - 0x01, // 1 param - 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" + 0x20, 0x09, 0x02, 0x01, 0x02, // versions = 1, 2 + 0x01, // 1 param + 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "WebTransport connection is using PATH parameter in SETUP"); + "Client SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -630,14 +631,14 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char setup[] = { - 0x40, 0x40, 0x04, 0x02, 0x01, 0x02, // versions = 1, 2 - 0x00, // no param + 0x20, 0x04, 0x02, 0x01, 0x02, // versions = 1, 2 + 0x00, // no param }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "PATH SETUP parameter missing from Client message over QUIC"); + "Client SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -645,17 +646,17 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char setup[] = { - 0x40, 0x40, 0x0f, 0x02, 0x01, 0x02, // versions = 1, 2 - 0x03, // 4 params - 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" - 0x02, 0x01, 0x32, // max_subscribe_id = 50 - 0x02, 0x01, 0x32, // max_subscribe_id = 50 + 0x20, 0x0d, 0x02, 0x01, 0x02, // versions = 1, 2 + 0x03, // 4 params + 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" + 0x02, 0x32, // max_subscribe_id = 50 + 0x02, 0x32, // max_subscribe_id = 50 }; stream.Receive(absl::string_view(setup, sizeof(setup)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "MAX_SUBSCRIBE_ID parameter appears twice in SETUP"); + "Client SETUP contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -669,14 +670,13 @@ 0x20, 0x02, // priority = 0x20 descending 0x02, // filter_type = kLatestObject 0x02, // two params - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "AUTHORIZATION_INFO parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -684,99 +684,58 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x18, 0x01, 0x02, 0x01, + 0x03, 0x16, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x02, // filter_type = kLatestObject 0x02, // two params - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x02, 0x67, 0x10, // delivery_timeout = 10000 }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "DELIVERY_TIMEOUT parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } -TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutMalformed) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char subscribe[] = { - 0x03, 0x14, 0x01, 0x02, 0x01, - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending - 0x02, // filter_type = kLatestObject - 0x01, // one param - 0x03, 0x01, 0x67, 0x10, // delivery_timeout = 10000 - }; - stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "Parameter length does not match varint encoding"); - EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch); -} - TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationTwice) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe[] = { - 0x03, 0x18, 0x01, 0x02, 0x01, + 0x03, 0x16, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x02, // filter_type = kLatestObject 0x02, // two params - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 + 0x04, 0x67, 0x10, // max_cache_duration = 10000 + 0x04, 0x67, 0x10, // max_cache_duration = 10000 }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "MAX_CACHE_DURATION parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } -TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationMalformed) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char subscribe[] = { - 0x03, 0x14, 0x01, 0x02, 0x01, - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending - 0x02, // filter_type = kLatestObject - 0x01, // one param - 0x04, 0x01, 0x67, 0x10, // max_cache_duration = 10000 - }; - stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "Parameter length does not match varint encoding"); - EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch); -} - TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationInfo) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); char subscribe_ok[] = { - 0x04, 0x10, 0x01, 0x03, // subscribe_id = 1, expires = 3 + 0x04, 0x0f, 0x01, 0x03, // subscribe_id = 1, expires = 3 0x02, 0x01, // group_order = 2, content exists 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, 0x02, // 2 parameters - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe_ok, sizeof(subscribe_ok)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_OK has authorization info"); + EXPECT_EQ(visitor_.parsing_error_, + "SUBSCRIBE_OK contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -787,13 +746,14 @@ 0x02, 0x0b, 0x02, 0x03, 0x01, 0x05, // start and end sequences 0xaa, // priority = 0xaa 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_UPDATE has authorization info"); + EXPECT_EQ(visitor_.parsing_error_, + "SUBSCRIBE_UPDATE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -803,14 +763,13 @@ char announce[] = { 0x06, 0x10, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x02, // 2 params - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(announce, sizeof(announce)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "AUTHORIZATION_INFO parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -818,15 +777,15 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); char announce[] = { - 0x06, 0x0f, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x06, 0x0e, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x02, // 2 params - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, 0x67, 0x10, // delivery_timeout = 10000 }; stream.Receive(absl::string_view(announce, sizeof(announce)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE has delivery timeout"); + EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -931,7 +890,7 @@ 0x20, 0x02, // priority = 0x20, group order descending 0x02, // filter_type = kLatestObject 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -953,7 +912,7 @@ 0x20, 0x08, // priority = 0x20 ??? 0x01, // filter_type = kLatestGroup 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -973,7 +932,7 @@ 0x04, // start_group = 4 0x01, // start_object = 1 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -999,7 +958,7 @@ 0x01, // start_object = 1 0x07, // end_group = 7 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -1025,7 +984,7 @@ 0x01, // start_object = 1 0x03, // end_group = 3 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); parser.ReadAndDispatchMessages(); @@ -1070,10 +1029,10 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe_update[] = { - 0x02, 0x0b, 0x02, 0x03, 0x01, 0x03, // start and end sequences + 0x02, 0x08, 0x02, 0x03, 0x01, 0x03, // start and end sequences 0x20, // priority 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, 0x20, // delivery_timeout = 32 ms }; stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), false);
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 88bedaf..967d81a 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -34,6 +34,8 @@ #include "quiche/quic/moqt/moqt_parser.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" +#include "quiche/quic/moqt/moqt_session_callbacks.h" +#include "quiche/quic/moqt/moqt_session_interface.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_track.h" #include "quiche/quic/platform/api/quic_logging.h" @@ -148,12 +150,8 @@ control_stream_ = control_stream->GetStreamId(); MoqtClientSetup setup = MoqtClientSetup{ .supported_versions = std::vector<MoqtVersion>{parameters_.version}, - .max_subscribe_id = parameters_.max_subscribe_id, - .supports_object_ack = parameters_.support_object_acks, + .parameters = parameters_, }; - if (!parameters_.using_webtrans) { - setup.path = parameters_.path; - } SendControlMessage(framer_.SerializeClientSetup(setup)); QUIC_DLOG(INFO) << ENDPOINT << "Send the SETUP message"; } @@ -243,7 +241,7 @@ bool MoqtSession::SubscribeAnnounces( FullTrackName track_namespace, MoqtOutgoingSubscribeAnnouncesCallback callback, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { if (received_goaway_ || sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE_ANNOUNCES after GOAWAY"; @@ -251,9 +249,7 @@ } MoqtSubscribeAnnounces message; message.track_namespace = track_namespace; - // Cannot contain a delivery timeout. - parameters.delivery_timeout = std::nullopt; - message.parameters = std::move(parameters); + message.parameters = parameters; SendControlMessage(framer_.SerializeSubscribeAnnounces(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_ANNOUNCES message for " << message.track_namespace; @@ -275,7 +271,8 @@ } void MoqtSession::Announce(FullTrackName track_namespace, - MoqtOutgoingAnnounceCallback announce_callback) { + MoqtOutgoingAnnounceCallback announce_callback, + VersionSpecificParameters parameters) { if (outgoing_announces_.contains(track_namespace)) { std::move(announce_callback)( track_namespace, @@ -290,6 +287,7 @@ } MoqtAnnounce message; message.track_namespace = track_namespace; + message.parameters = parameters; SendControlMessage(framer_.SerializeAnnounce(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent ANNOUNCE message for " << message.track_namespace; @@ -323,14 +321,14 @@ bool MoqtSession::SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { MoqtSubscribe message; message.full_track_name = name; message.subscriber_priority = kDefaultSubscriberPriority; message.group_order = std::nullopt; message.start = Location(start_group, start_object); message.end_group = std::nullopt; - message.parameters = std::move(parameters); + message.parameters = parameters; return Subscribe(message, visitor); } @@ -338,7 +336,7 @@ uint64_t start_group, uint64_t start_object, uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { if (end_group < start_group) { QUIC_DLOG(ERROR) << "Subscription end is before beginning"; return false; @@ -349,20 +347,20 @@ message.group_order = std::nullopt; message.start = Location(start_group, start_object); message.end_group = end_group; - message.parameters = std::move(parameters); + message.parameters = parameters; return Subscribe(message, visitor); } bool MoqtSession::SubscribeCurrentObject(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { MoqtSubscribe message; message.full_track_name = name; message.subscriber_priority = kDefaultSubscriberPriority; message.group_order = std::nullopt; message.start = std::nullopt; message.end_group = std::nullopt; - message.parameters = std::move(parameters); + message.parameters = parameters; return Subscribe(message, visitor); } @@ -383,8 +381,7 @@ uint64_t end_group, std::optional<uint64_t> end_object, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) { - // TODO(martinduke): support authorization info + VersionSpecificParameters parameters) { if (next_subscribe_id_ >= peer_max_subscribe_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send FETCH with ID " << next_subscribe_id_ @@ -396,8 +393,6 @@ QUIC_DLOG(INFO) << ENDPOINT << "Tried to send FETCH after GOAWAY"; return false; } - // Cannot have a delivery timeout. - parameters.delivery_timeout = std::nullopt; MoqtFetch message; message.full_track_name = name; message.fetch_id = next_subscribe_id_++; @@ -407,7 +402,6 @@ message.subscriber_priority = priority; message.group_order = delivery_order; message.parameters = parameters; - message.parameters.object_ack_window = std::nullopt; SendControlMessage(framer_.SerializeFetch(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for " << message.full_track_name; @@ -419,7 +413,7 @@ bool MoqtSession::JoiningFetch(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, uint64_t num_previous_groups, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { return JoiningFetch( name, visitor, [this, @@ -444,7 +438,7 @@ uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { if ((next_subscribe_id_ + 1) >= peer_max_subscribe_id_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID " << (next_subscribe_id_ + 1) @@ -633,23 +627,15 @@ visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this, message.subscribe_id)); } else { - QUICHE_DLOG_IF(WARNING, message.parameters.object_ack_window.has_value()) + QUICHE_DLOG_IF(WARNING, message.parameters.oack_window_size.has_value()) << "Attempting to set object_ack_window on a connection that does not " "support it."; - message.parameters.object_ack_window = std::nullopt; - } - if (message.parameters.delivery_timeout.has_value() && - message.parameters.delivery_timeout->IsInfinite()) { - // Cannot encode an infinite delivery timeout. - message.parameters.delivery_timeout = std::nullopt; + message.parameters.oack_window_size = std::nullopt; } SendControlMessage(framer_.SerializeSubscribe(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for " << message.full_track_name; - auto track = std::make_unique<SubscribeRemoteTrack>( - message, visitor, - message.parameters.delivery_timeout.value_or( - quic::QuicTimeDelta::Infinite())); + auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor); subscribe_by_name_.emplace(message.full_track_name, track.get()); subscribe_by_alias_.emplace(message.track_alias, track.get()); upstream_by_id_.emplace(message.subscribe_id, std::move(track)); @@ -856,20 +842,17 @@ absl::Hex(session_->parameters_.version))); return; } - session_->peer_supports_object_ack_ = message.supports_object_ack; + session_->peer_supports_object_ack_ = message.parameters.support_object_acks; QUICHE_DLOG(INFO) << ENDPOINT << "Received the SETUP message"; if (session_->parameters_.perspective == Perspective::IS_SERVER) { MoqtServerSetup response; + response.parameters = session_->parameters_; response.selected_version = session_->parameters_.version; - response.max_subscribe_id = session_->parameters_.max_subscribe_id; - response.supports_object_ack = session_->parameters_.support_object_acks; SendOrBufferMessage(session_->framer_.SerializeServerSetup(response)); QUIC_DLOG(INFO) << ENDPOINT << "Sent the SETUP message"; } // TODO: handle path. - if (message.max_subscribe_id.has_value()) { - session_->peer_max_subscribe_id_ = *message.max_subscribe_id; - } + session_->peer_max_subscribe_id_ = message.parameters.max_subscribe_id; std::move(session_->callbacks_.session_established_callback)(); } @@ -887,12 +870,10 @@ absl::Hex(session_->parameters_.version))); return; } - session_->peer_supports_object_ack_ = message.supports_object_ack; + session_->peer_supports_object_ack_ = message.parameters.support_object_acks; QUIC_DLOG(INFO) << ENDPOINT << "Received the SETUP message"; // TODO: handle path. - if (message.max_subscribe_id.has_value()) { - session_->peer_max_subscribe_id_ = *message.max_subscribe_id; - } + session_->peer_max_subscribe_id_ = message.parameters.max_subscribe_id; std::move(session_->callbacks_.session_established_callback)(); } @@ -960,9 +941,7 @@ MoqtTrackPublisher* track_publisher_ptr = track_publisher->get(); auto subscription = std::make_unique<MoqtSession::PublishedSubscription>( session_, *std::move(track_publisher), message, monitoring); - subscription->set_delivery_timeout( - message.parameters.delivery_timeout.value_or( - quic::QuicTimeDelta::Infinite())); + subscription->set_delivery_timeout(message.parameters.delivery_timeout); MoqtSession::PublishedSubscription* subscription_ptr = subscription.get(); auto [it, success] = session_->published_subscriptions_.emplace( message.subscribe_id, std::move(subscription)); @@ -1087,9 +1066,7 @@ } it->second->Update(message.start, message.end_group, message.subscriber_priority); - if (message.parameters.delivery_timeout.has_value()) { - it->second->set_delivery_timeout(*message.parameters.delivery_timeout); - } + it->second->set_delivery_timeout(message.parameters.delivery_timeout); } void MoqtSession::ControlStream::OnAnnounceMessage( @@ -1692,7 +1669,7 @@ monitoring_interface_(monitoring_interface) { if (monitoring_interface_ != nullptr) { monitoring_interface_->OnObjectAckSupportKnown( - subscribe.parameters.object_ack_window.has_value()); + subscribe.parameters.oack_window_size.has_value()); } QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for " << subscribe.full_track_name;
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index da5742e..f1e8c81 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -91,17 +91,17 @@ quic::Perspective perspective() const { return parameters_.perspective; } // Returns true if message was sent. - bool SubscribeAnnounces( - FullTrackName track_namespace, - MoqtOutgoingSubscribeAnnouncesCallback callback, - MoqtSubscribeParameters parameters = MoqtSubscribeParameters()); + bool SubscribeAnnounces(FullTrackName track_namespace, + MoqtOutgoingSubscribeAnnouncesCallback callback, + VersionSpecificParameters parameters); bool UnsubscribeAnnounces(FullTrackName track_namespace); // Send an ANNOUNCE message for |track_namespace|, and call // |announce_callback| when the response arrives. Will fail immediately if // there is already an unresolved ANNOUNCE for that namespace. void Announce(FullTrackName track_namespace, - MoqtOutgoingAnnounceCallback announce_callback); + MoqtOutgoingAnnounceCallback announce_callback, + VersionSpecificParameters parameters); // Returns true if message was sent, false if there is no ANNOUNCE to cancel. bool Unannounce(FullTrackName track_namespace); // Allows the subscriber to declare it will not subscribe to |track_namespace| @@ -116,15 +116,15 @@ bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; // Subscribe from (start_group, start_object) to the end of end_group. bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; bool SubscribeCurrentObject(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; // Returns false if the subscription is not found. The session immediately // destroys all subscription state. void Unsubscribe(const FullTrackName& name); @@ -136,7 +136,7 @@ Location start, uint64_t end_group, std::optional<uint64_t> end_object, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; // Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups| // groups before the current group. The Fetch will not be flow controlled, // instead using |visitor| to deliver fetched objects when they arrive. Gaps @@ -146,7 +146,7 @@ bool JoiningFetch(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, uint64_t num_previous_groups, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; // Sends both a SUBSCRIBE and a joining FETCH, beginning |num_previous_groups| // groups before the current group. The application provides |callback| to // fully control acceptance of Fetched objects. @@ -155,7 +155,7 @@ FetchResponseCallback callback, uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) override; + VersionSpecificParameters parameters) override; // Send a GOAWAY message to the peer. |new_session_uri| must be empty if // called by the client.
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index 1fd77eb..cdd197e 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -49,18 +49,18 @@ virtual bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; // Subscribe from (start_group, start_object) to the end of end_group. virtual bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; // Subscribe to all objects that are larger than the current Largest // Group/Object ID. virtual bool SubscribeCurrentObject(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; // Sends an UNSUBSCRIBE message and removes all of the state related to the // subscription. Returns false if the subscription is not found. @@ -74,7 +74,7 @@ Location start, uint64_t end_group, std::optional<uint64_t> end_object, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups` // groups before the current group. The Fetch will not be flow controlled, @@ -85,7 +85,7 @@ virtual bool JoiningFetch(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, uint64_t num_previous_groups, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; // Sends both a SUBSCRIBE and a joining FETCH, beginning `num_previous_groups` // groups before the current group. `callback` acts the same way as the @@ -95,7 +95,13 @@ FetchResponseCallback callback, uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) = 0; + VersionSpecificParameters parameters) = 0; + + // TODO: Add SubscribeAnnounces, UnsubscribeAnnounces method. + // TODO: Add Announce, Unannounce method. + // TODO: Add AnnounceCancel method. + // TODO: Add TrackStatusRequest method. + // TODO: Add SubscribeUpdate, SubscribeDone method. }; } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 73d5fc8..efe079e 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -66,7 +66,7 @@ /*group_order=*/std::nullopt, /*start=*/Location(0, 0), /*end_group=*/std::nullopt, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; return subscribe; } @@ -81,7 +81,7 @@ /*start=*/Location(0, 0), /*end_group=*/1, /*end_object=*/std::nullopt, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; return fetch; } @@ -160,7 +160,7 @@ (*track_status == MoqtTrackStatusCode::kInProgress) ? std::make_optional(publisher->GetLargestSequence()) : std::optional<Location>(), - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _)); control_parser->OnSubscribeMessage(subscribe); @@ -272,7 +272,7 @@ MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_); MoqtClientSetup setup = { /*supported_versions=*/{kDefaultMoqtVersion}, - /*path=*/std::nullopt, + MoqtSessionParameters(quic::Perspective::IS_CLIENT), }; EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _)); @@ -361,7 +361,8 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); session_.Announce(FullTrackName{"foo"}, - announce_resolved_callback.AsStdFunction()); + announce_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); MoqtAnnounceOk ok = { /*track_namespace=*/FullTrackName{"foo"}, @@ -403,7 +404,8 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); session_.Announce(FullTrackName{"foo"}, - announce_resolved_callback.AsStdFunction()); + announce_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); MoqtAnnounceOk ok = { /*track_namespace=*/FullTrackName{"foo"}, @@ -435,7 +437,8 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kAnnounce), _)); session_.Announce(FullTrackName{"foo"}, - announce_resolved_callback.AsStdFunction()); + announce_resolved_callback.AsStdFunction(), + VersionSpecificParameters()); MoqtAnnounceError error = { /*track_namespace=*/FullTrackName{"foo"}, @@ -605,18 +608,18 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); EXPECT_CALL( mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _)) .Times(1); EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); // Second time does not send SUBSCRIBES_BLOCKED. EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); } TEST_F(MoqtSessionTest, SubscribeDuplicateTrackName) { @@ -629,10 +632,10 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); } TEST_F(MoqtSessionTest, SubscribeWithOk) { @@ -644,7 +647,7 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); MoqtSubscribeOk ok = { /*subscribe_id=*/0, @@ -673,7 +676,7 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _)); EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); MoqtMaxSubscribeId max_subscribe_id = { /*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1, }; @@ -683,7 +686,7 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); } TEST_F(MoqtSessionTest, LowerMaxSubscribeIdIsAnError) { @@ -723,7 +726,7 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); MoqtSubscribeError error = { /*subscribe_id=*/0, @@ -842,7 +845,8 @@ EXPECT_EQ(track_namespace, ftn); EXPECT_FALSE(error.has_value()); EXPECT_EQ(reason, ""); - }); + }, + VersionSpecificParameters()); MoqtSubscribeAnnouncesOk ok = { /*track_namespace=*/track_namespace, }; @@ -873,7 +877,8 @@ ASSERT_TRUE(error.has_value()); EXPECT_EQ(*error, SubscribeErrorCode::kInvalidRange); EXPECT_EQ(reason, "deadbeef"); - }); + }, + VersionSpecificParameters()); MoqtSubscribeAnnouncesError error = { /*track_namespace=*/track_namespace, /*error_code=*/SubscribeErrorCode::kInvalidRange, @@ -1637,7 +1642,7 @@ MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream_); MoqtClientSetup setup = { /*supported_versions*/ {kDefaultMoqtVersion}, - /*path=*/std::nullopt, + MoqtSessionParameters(), }; EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kServerSetup), _)); @@ -2344,7 +2349,7 @@ Writev(SerializedControlMessage(expected_fetch), _)); EXPECT_TRUE(session_.JoiningFetch( expected_subscribe.full_track_name, &remote_track_visitor, nullptr, 1, - 0x80, MoqtDeliveryOrder::kAscending, MoqtSubscribeParameters())); + 0x80, MoqtDeliveryOrder::kAscending, VersionSpecificParameters())); } TEST_F(MoqtSessionTest, SendJoiningFetchNoFlowControl) { @@ -2359,16 +2364,16 @@ Writev(ControlMessageOfType(MoqtMessageType::kFetch), _)); EXPECT_TRUE(session_.JoiningFetch(FullTrackName("foo", "bar"), &remote_track_visitor, 0, - MoqtSubscribeParameters())); + VersionSpecificParameters())); EXPECT_CALL(remote_track_visitor, OnReply).Times(1); stream_input->OnSubscribeOkMessage( MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0), MoqtDeliveryOrder::kAscending, Location(2, 0), - MoqtSubscribeParameters())); + VersionSpecificParameters())); stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending, Location(2, 0), - MoqtSubscribeParameters())); + VersionSpecificParameters())); // Packet arrives on FETCH stream. MoqtObject object = { /*fetch_id=*/1, @@ -2397,7 +2402,7 @@ FullTrackName track_namespace = FullTrackName{"foo"}; MoqtSubscribeAnnounces announces = { track_namespace, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -2422,7 +2427,7 @@ FullTrackName track_namespace = FullTrackName{"foo"}; MoqtSubscribeAnnounces announces = { track_namespace, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; webtransport::test::MockStream control_stream; std::unique_ptr<MoqtControlParserVisitor> stream_input = @@ -2448,12 +2453,12 @@ fetch_task = std::move(task); }, Location(0, 0), 4, std::nullopt, 128, std::nullopt, - MoqtSubscribeParameters()); + VersionSpecificParameters()); MoqtFetchOk ok = { /*subscribe_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), - MoqtSubscribeParameters(), + VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -2478,7 +2483,7 @@ fetch_task = std::move(task); }, Location(0, 0), 4, std::nullopt, 128, std::nullopt, - MoqtSubscribeParameters()); + VersionSpecificParameters()); MoqtFetchError error = { /*subscribe_id=*/0, /*error_code=*/SubscribeErrorCode::kUnauthorized, @@ -2513,7 +2518,7 @@ }); }, Location(0, 0), 4, std::nullopt, 128, std::nullopt, - MoqtSubscribeParameters()); + VersionSpecificParameters()); // Build queue of packets to arrive. std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; @@ -2553,7 +2558,7 @@ /*subscribe_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), - MoqtSubscribeParameters(), + VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -2583,7 +2588,7 @@ [&]() { objects_available = true; }); }, Location(0, 0), 4, std::nullopt, 128, std::nullopt, - MoqtSubscribeParameters()); + VersionSpecificParameters()); // Build queue of packets to arrive. std::queue<quiche::QuicheBuffer> headers; std::queue<std::string> payloads; @@ -2623,7 +2628,7 @@ /*subscribe_id=*/0, /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/Location(3, 25), - MoqtSubscribeParameters(), + VersionSpecificParameters(), }; stream_input->OnFetchOkMessage(ok); ASSERT_NE(fetch_task, nullptr); @@ -2959,19 +2964,22 @@ MockSubscribeRemoteTrackVisitor remote_track_visitor; EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); EXPECT_FALSE(session_.SubscribeAnnounces( - FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/, - std::optional<SubscribeErrorCode> /*error*/, - absl::string_view /*reason*/) {})); + FullTrackName{"foo"}, + +[](FullTrackName /*track_namespace*/, + std::optional<SubscribeErrorCode> /*error*/, + absl::string_view /*reason*/) {}, + VersionSpecificParameters())); session_.Announce( FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/, - std::optional<MoqtAnnounceErrorReason> /*error*/) {}); + std::optional<MoqtAnnounceErrorReason> /*error*/) {}, + VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName{"foo", "bar"}, +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5, - std::nullopt, 127, std::nullopt, MoqtSubscribeParameters())); + std::nullopt, 127, std::nullopt, VersionSpecificParameters())); // Error on additional GOAWAY. EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -3000,7 +3008,7 @@ EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kAnnounceError), _)); stream_input->OnAnnounceMessage( - MoqtAnnounce(FullTrackName("foo", "bar"), MoqtSubscribeParameters())); + MoqtAnnounce(FullTrackName("foo", "bar"), VersionSpecificParameters())); EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _)); MoqtFetch fetch = DefaultFetch(); @@ -3016,19 +3024,22 @@ MockSubscribeRemoteTrackVisitor remote_track_visitor; EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); EXPECT_FALSE(session_.SubscribeAnnounces( - FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/, - std::optional<SubscribeErrorCode> /*error*/, - absl::string_view /*reason*/) {})); + FullTrackName{"foo"}, + +[](FullTrackName /*track_namespace*/, + std::optional<SubscribeErrorCode> /*error*/, + absl::string_view /*reason*/) {}, + VersionSpecificParameters())); session_.Announce( FullTrackName{"foo"}, +[](FullTrackName /*track_namespace*/, - std::optional<MoqtAnnounceErrorReason> /*error*/) {}); + std::optional<MoqtAnnounceErrorReason> /*error*/) {}, + VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName{"foo", "bar"}, +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5, - std::nullopt, 127, std::nullopt, MoqtSubscribeParameters())); + std::nullopt, 127, std::nullopt, VersionSpecificParameters())); session_.GoAway(""); // GoAway timer fires. auto* goaway_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( @@ -3085,13 +3096,13 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); MoqtSubscribeOk ok = { /*subscribe_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; stream_input->OnSubscribeOkMessage(ok); constexpr uint64_t kNumStreams = 3; @@ -3143,13 +3154,13 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); MoqtSubscribeOk ok = { /*subscribe_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; stream_input->OnSubscribeOkMessage(ok); constexpr uint64_t kNumStreams = 3; @@ -3198,13 +3209,13 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); EXPECT_TRUE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, - MoqtSubscribeParameters())); + VersionSpecificParameters())); MoqtSubscribeOk ok = { /*subscribe_id=*/0, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000), /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, - /*parameters=*/MoqtSubscribeParameters(), + /*parameters=*/VersionSpecificParameters(), }; stream_input->OnSubscribeOkMessage(ok); constexpr uint64_t kNumStreams = 3;
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 2581248..b323979 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -115,15 +115,13 @@ bool end_of_message) = 0; virtual void OnSubscribeDone(FullTrackName full_track_name) = 0; }; - SubscribeRemoteTrack( - const MoqtSubscribe& subscribe, Visitor* visitor, - quic::QuicTimeDelta delivery_timeout = quic::QuicTimeDelta::Infinite()) + SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor) : RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id, SubscribeWindow(subscribe.start.value_or(Location()), subscribe.end_group)), track_alias_(subscribe.track_alias), visitor_(visitor), - delivery_timeout_(delivery_timeout), + delivery_timeout_(subscribe.parameters.delivery_timeout), subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {} ~SubscribeRemoteTrack() override { if (subscribe_done_alarm_ != nullptr) {
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 910c433..8b9703e 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -56,7 +56,7 @@ /*group_order=*/std::nullopt, /*start=*/Location(2, 0), std::nullopt, - MoqtSubscribeParameters(), + VersionSpecificParameters(), }; SubscribeRemoteTrack track_; }; @@ -107,7 +107,7 @@ /*start_object=*/Location(1, 1), /*end_group=*/3, /*end_object=*/100, - /*parameters=*/MoqtSubscribeParameters(), + VersionSpecificParameters(), }; // The pointer held by the application. UpstreamFetch fetch_;
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.cc b/quiche/quic/moqt/test_tools/mock_moqt_session.cc index 565b1db..5753aa8 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.cc +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.cc
@@ -127,14 +127,14 @@ ON_CALL(*this, SubscribeCurrentObject) .WillByDefault([this](const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters) { + VersionSpecificParameters) { return Subscribe(name, visitor, SubscribeWindow()); }); ON_CALL(*this, SubscribeAbsolute(_, _, _, _, _)) .WillByDefault([this](const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters) { + VersionSpecificParameters) { return Subscribe( name, visitor, SubscribeWindow(Location(start_group, start_object))); @@ -143,7 +143,7 @@ .WillByDefault([this](const FullTrackName& name, uint64_t start_group, uint64_t start_object, uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters) { + VersionSpecificParameters) { return Subscribe( name, visitor, SubscribeWindow(Location(start_group, start_object), end_group)); @@ -158,7 +158,7 @@ Location start, uint64_t end_group, std::optional<uint64_t> end_object, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { auto track_publisher = publisher_->GetTrack(name); if (!track_publisher.ok()) { std::move(callback)(std::make_unique<MoqtFailedFetch>( @@ -174,7 +174,7 @@ .WillByDefault([this](const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, uint64_t num_previous_groups, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { return JoiningFetch( name, visitor, [name, visitor](std::unique_ptr<MoqtFetchTask> fetch) { @@ -195,7 +195,7 @@ uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters) { + VersionSpecificParameters parameters) { SubscribeCurrentObject(name, visitor, parameters); auto track_publisher = publisher_->GetTrack(name); if (!track_publisher.ok()) {
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h index 0deb44d..50405e4 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -42,18 +42,18 @@ MOCK_METHOD(bool, SubscribeAbsolute, (const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); MOCK_METHOD(bool, SubscribeAbsolute, (const FullTrackName& name, uint64_t start_group, uint64_t start_object, uint64_t end_group, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); MOCK_METHOD(bool, SubscribeCurrentObject, (const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override)); MOCK_METHOD(bool, Fetch, @@ -61,13 +61,13 @@ Location start, uint64_t end_group, std::optional<uint64_t> end_object, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); MOCK_METHOD(bool, JoiningFetch, (const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, uint64_t num_previous_groups, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); MOCK_METHOD(bool, JoiningFetch, (const FullTrackName& name, @@ -75,7 +75,7 @@ FetchResponseCallback callback, uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, - MoqtSubscribeParameters parameters), + VersionSpecificParameters parameters), (override)); private:
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc index f067f31..1df0121 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc +++ b/quiche/quic/moqt/test_tools/mock_moqt_session_test.cc
@@ -45,7 +45,7 @@ OnReply(FullTrackName("doesn't", "exist"), Eq(std::nullopt), Optional(HasSubstr("not found")))); session_.SubscribeCurrentObject(FullTrackName("doesn't", "exist"), &visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); } TEST_F(MockMoqtSessionTest, SubscribeCurrentObject) { @@ -53,7 +53,7 @@ EXPECT_CALL(visitor, OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); session_.SubscribeCurrentObject(TrackName(), &visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _)); track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true); @@ -68,7 +68,7 @@ EXPECT_CALL(visitor, OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); session_.SubscribeAbsolute(TrackName(), 1, 0, 1, &visitor, - MoqtSubscribeParameters()); + VersionSpecificParameters()); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 0), _, MoqtObjectStatus::kNormal, "b", _)); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(1, 1), _, @@ -89,7 +89,7 @@ [&](std::unique_ptr<MoqtFetchTask> new_fetch) { fetch = std::move(new_fetch); }, - Location(0, 1), 0, 2, 0x80, std::nullopt, MoqtSubscribeParameters()); + Location(0, 1), 0, 2, 0x80, std::nullopt, VersionSpecificParameters()); PublishedObject object; ASSERT_EQ(fetch->GetNextObject(object), MoqtFetchTask::kSuccess); EXPECT_EQ(object.payload.AsStringView(), "b"); @@ -113,7 +113,7 @@ MoqtObjectStatus::kEndOfGroup, "", _)); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 0), _, MoqtObjectStatus::kNormal, "d", _)); - session_.JoiningFetch(TrackName(), &visitor, 2, MoqtSubscribeParameters()); + session_.JoiningFetch(TrackName(), &visitor, 2, VersionSpecificParameters()); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(3, 1), _, MoqtObjectStatus::kEndOfGroup, "", _)); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(4, 0), _, @@ -125,7 +125,7 @@ testing::StrictMock<MockSubscribeRemoteTrackVisitor> visitor; EXPECT_CALL(visitor, OnReply(TrackName(), Eq(std::nullopt), Eq(std::nullopt))); - session_.JoiningFetch(TrackName(), &visitor, 0, MoqtSubscribeParameters()); + session_.JoiningFetch(TrackName(), &visitor, 0, VersionSpecificParameters()); EXPECT_CALL(visitor, OnObjectFragment(TrackName(), Location(0, 0), _, _, "test", _)); track_->AddObject(quic::test::MemSliceFromString("test"), /*key=*/true);
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h index a1f4b52..611292a 100644 --- a/quiche/quic/moqt/test_tools/moqt_session_peer.h +++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -192,7 +192,7 @@ Location{0, 0}, 4, std::nullopt, - MoqtSubscribeParameters(), + VersionSpecificParameters(), }; std::unique_ptr<MoqtFetchTask> task; auto [it, success] = session->upstream_by_id_.try_emplace(
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 180ee7a..63b60ec 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -19,6 +19,7 @@ #include "quiche/quic/core/quic_data_reader.h" #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/platform/api/quic_logging.h" @@ -373,11 +374,12 @@ class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase { public: explicit ClientSetupMessage(bool webtrans) : TestMessageBase() { + client_setup_.parameters.using_webtrans = webtrans; if (webtrans) { // Should not send PATH. - client_setup_.path = std::nullopt; - raw_packet_[2] = 0x07; // adjust payload length (-5) - raw_packet_[6] = 0x01; // only two parameters + client_setup_.parameters.path = ""; + raw_packet_[1] = 0x06; // adjust payload length (-5) + raw_packet_[5] = 0x01; // only one parameter SetWireImage(raw_packet_, sizeof(raw_packet_) - 5); } else { SetWireImage(raw_packet_, sizeof(raw_packet_)); @@ -398,24 +400,18 @@ return false; } } - if (cast.path != client_setup_.path) { - QUIC_LOG(INFO) << "CLIENT_SETUP path mismatch"; - return false; - } - if (cast.max_subscribe_id != client_setup_.max_subscribe_id) { - QUIC_LOG(INFO) << "CLIENT_SETUP max_subscribe_id mismatch"; + if (cast.parameters != client_setup_.parameters) { + QUIC_LOG(INFO) << "CLIENT_SETUP parameter mismatch"; return false; } return true; } void ExpandVarints() override { - if (client_setup_.path.has_value()) { - ExpandVarintsImpl("--vvvvvvv-vv---"); - // first two bytes are already a 2B varint. Also, don't expand parameter - // varints because that messes up the parameter length field. + if (!client_setup_.parameters.path.empty()) { + ExpandVarintsImpl("vvvvvvvvvv---"); } else { - ExpandVarintsImpl("--vvvvvvv-"); + ExpandVarintsImpl("vvvvvvvv"); } } @@ -424,58 +420,51 @@ } private: - uint8_t raw_packet_[15] = { - 0x40, 0x40, 0x0c, // type + uint8_t raw_packet_[13] = { + 0x20, 0x0b, // type 0x02, 0x01, 0x02, // versions - 0x02, // 3 parameters - 0x02, 0x01, 0x32, // max_subscribe_id = 50 + 0x02, // 2 parameters + 0x02, 0x32, // max_request_id = 50 0x01, 0x03, 0x66, 0x6f, 0x6f, // path = "foo" }; MoqtClientSetup client_setup_ = { /*supported_versions=*/std::vector<MoqtVersion>( {static_cast<MoqtVersion>(1), static_cast<MoqtVersion>(2)}), - /*path=*/"foo", - /*max_subscribe_id=*/50, + MoqtSessionParameters(quic::Perspective::IS_CLIENT, "foo", 50), }; }; class QUICHE_NO_EXPORT ServerSetupMessage : public TestMessageBase { public: - explicit ServerSetupMessage() : TestMessageBase() { + explicit ServerSetupMessage(bool webtrans) : TestMessageBase() { + server_setup_.parameters.using_webtrans = webtrans; SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtServerSetup>(values); - if (cast.selected_version != server_setup_.selected_version) { - QUIC_LOG(INFO) << "SERVER_SETUP selected version mismatch"; - return false; - } - if (cast.max_subscribe_id != server_setup_.max_subscribe_id) { - QUIC_LOG(INFO) << "SERVER_SETUP max_subscribe_id mismatch"; + if (cast.parameters != server_setup_.parameters) { + QUIC_LOG(INFO) << "SERVER_SETUP parameter mismatch"; return false; } return true; } - void ExpandVarints() override { - ExpandVarintsImpl("--vvvvv-"); // first two bytes are already a 2b - // varint - } + void ExpandVarints() override { ExpandVarintsImpl("vvvvvv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(server_setup_); } private: - uint8_t raw_packet_[8] = { - 0x40, 0x41, 0x05, // type - 0x01, 0x01, // version, two parameters - 0x02, 0x01, 0x32, // max_subscribe_id = 50 + uint8_t raw_packet_[6] = { + 0x21, 0x04, // type + 0x01, 0x01, // version, one parameter + 0x02, 0x32, // max_request_id = 50 }; MoqtServerSetup server_setup_ = { /*selected_version=*/static_cast<MoqtVersion>(1), - /*max_subscribe_id=*/50, + MoqtSessionParameters(quic::Perspective::IS_SERVER, 50), }; }; @@ -523,7 +512,7 @@ } void ExpandVarints() override { - ExpandVarintsImpl("vvvvvv---v------vvvvvv---vv--vv--"); + ExpandVarintsImpl("vvvvvv---v------vvvvvv---v--"); } MessageStructuredData structured_data() const override { @@ -531,8 +520,8 @@ } private: - uint8_t raw_packet_[33] = { - 0x03, 0x1f, 0x01, 0x02, // id and alias + uint8_t raw_packet_[28] = { + 0x03, 0x1a, 0x01, 0x02, // id and alias 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, // subscriber priority = 0x20 @@ -541,10 +530,9 @@ 0x04, // start_group = 4 (relative previous) 0x01, // start_object = 1 (absolute) // No EndGroup or EndObject - 0x03, // 3 parameters - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 ms + 0x02, // 2 parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; MoqtSubscribe subscribe_ = { @@ -555,10 +543,8 @@ /*group_order=*/MoqtDeliveryOrder::kDescending, /*start=*/Location(4, 1), /*end_group=*/std::nullopt, - /*parameters=*/ - MoqtSubscribeParameters{ - "bar", quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, + VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), + "bar"), }; }; @@ -593,7 +579,7 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv--vvvvv--vv--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv--vvvv--v--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(subscribe_ok_); @@ -610,13 +596,13 @@ } private: - uint8_t raw_packet_[17] = { - 0x04, 0x0f, 0x01, 0x03, // subscribe_id = 1, expires = 3 + uint8_t raw_packet_[15] = { + 0x04, 0x0d, 0x01, 0x03, // subscribe_id = 1, expires = 3 0x02, 0x01, // group_order = 2, content exists 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, 0x02, // 2 parameters - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 + 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x04, 0x67, 0x10, // max_cache_duration = 10000 }; MoqtSubscribeOk subscribe_ok_ = { @@ -624,10 +610,8 @@ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(3), /*group_order=*/MoqtDeliveryOrder::kDescending, /*largest_id=*/Location(12, 20), - /*parameters=*/ - MoqtSubscribeParameters{ - std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, + VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::FromMilliseconds(10000)), }; }; @@ -792,19 +776,18 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvvvv-vvv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvvvv-vv--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(subscribe_update_); } private: - uint8_t raw_packet_[16] = { - 0x02, 0x0e, 0x02, 0x03, 0x01, 0x05, // start and end sequences + uint8_t raw_packet_[11] = { + 0x02, 0x09, 0x02, 0x03, 0x01, 0x05, // start and end sequences 0xaa, // subscriber_priority - 0x02, // 2 parameters - 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 + 0x01, // 1 parameter + 0x02, 0x67, 0x10, // delivery_timeout = 10000 }; MoqtSubscribeUpdate subscribe_update_ = { @@ -812,10 +795,8 @@ /*start=*/Location(3, 1), /*end_group=*/4, /*subscriber_priority=*/0xaa, - /*parameters=*/ - MoqtSubscribeParameters{ - std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, + VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::Infinite()), }; }; @@ -832,32 +813,28 @@ return false; } if (cast.parameters != announce_.parameters) { - QUIC_LOG(INFO) << "ANNOUNCE MESSAGE authorization info mismatch"; + QUIC_LOG(INFO) << "ANNOUNCE MESSAGE parameter mismatch"; return false; } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv---vvv---vv--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv---vvv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(announce_); } private: - uint8_t raw_packet_[17] = { - 0x06, 0x0f, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x02, // 2 parameters - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000ms + uint8_t raw_packet_[13] = { + 0x06, 0x0b, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, // 1 parameter + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; MoqtAnnounce announce_ = { /*track_namespace=*/FullTrackName{"foo"}, - /*parameters=*/ - MoqtSubscribeParameters{"bar", std::nullopt, - quic::QuicTimeDelta::FromMilliseconds(10000), - std::nullopt}, + VersionSpecificParameters("bar"), }; }; @@ -990,23 +967,30 @@ QUIC_LOG(INFO) << "TRACK STATUS REQUEST track name mismatch"; return false; } + if (cast.parameters != track_status_request_.parameters) { + QUIC_LOG(INFO) << "TRACK STATUS REQUEST parameter mismatch"; + return false; + } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(track_status_request_); } private: - uint8_t raw_packet_[12] = { - 0x0d, 0x0a, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + uint8_t raw_packet_[18] = { + 0x0d, 0x10, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x01, // 1 parameter + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; MoqtTrackStatusRequest track_status_request_ = { /*full_track_name=*/FullTrackName({"foo", "abcd"}), + VersionSpecificParameters("bar"), }; }; @@ -1065,20 +1049,27 @@ QUIC_LOG(INFO) << "TRACK STATUS last object mismatch"; return false; } + if (cast.parameters != track_status_.parameters) { + QUIC_LOG(INFO) << "TRACK STATUS parameters mismatch"; + return false; + } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv---v----vvvvv--v--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(track_status_); } private: - uint8_t raw_packet_[15] = { - 0x0e, 0x0d, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + uint8_t raw_packet_[22] = { + 0x0e, 0x14, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x00, 0x0c, 0x14, // status, last_group, last_object + 0x02, // 2 parameters + 0x02, 0x67, 0x10, // Delivery Timeout = 10000 + 0x04, 0x67, 0x10, // Max Cache Duration = 10000 }; MoqtTrackStatus track_status_ = { @@ -1086,6 +1077,8 @@ /*status_code=*/MoqtTrackStatusCode::kInProgress, /*last_group=*/12, /*last_object=*/20, + VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::FromMilliseconds(10000)), }; }; @@ -1149,13 +1142,12 @@ uint8_t raw_packet_[13] = { 0x11, 0x0b, 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo" 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; MoqtSubscribeAnnounces subscribe_namespace_ = { /*track_namespace=*/FullTrackName{"foo"}, - /*parameters=*/ - MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters("bar"), }; }; @@ -1391,7 +1383,7 @@ 0x03, 0x62, 0x61, 0x72, // track_name = "bar" 0x01, 0x02, // start_object = 1, 2 0x05, 0x07, // end_object = 5, 6 - 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" + 0x01, 0x03, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" }; MoqtFetch fetch_ = { @@ -1403,8 +1395,7 @@ /*start_object=*/Location{1, 2}, /*end_group=*/5, /*end_object=*/6, - /*parameters=*/ - MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters("baz"), }; }; @@ -1490,7 +1481,7 @@ 0x01, // group_order = kAscending 0x02, // type = kJoining 0x02, 0x02, // joining_subscribe_id = 2, 2 groups - 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" + 0x01, 0x03, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" }; MoqtFetch fetch_ = { @@ -1503,8 +1494,7 @@ /*start_object=*/Location{1, 2}, /*end_group=*/5, /*end_object=*/6, - /*parameters=*/ - MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters("baz"), }; }; @@ -1572,20 +1562,20 @@ } private: - uint8_t raw_packet_[12] = { - 0x18, 0x0a, - 0x01, // subscribe_id = 1 - 0x01, // group_order = kAscending - 0x05, 0x04, // largest_object = 5, 4 - 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" + uint8_t raw_packet_[10] = { + 0x18, 0x08, + 0x01, // subscribe_id = 1 + 0x01, // group_order = kAscending + 0x05, 0x04, // largest_object = 5, 4 + 0x01, 0x04, 0x67, 0x10, // MaxCacheDuration = 10000 }; MoqtFetchOk fetch_ok_ = { /*subscribe_id =*/1, /*group_order=*/MoqtDeliveryOrder::kAscending, /*start_object=*/Location{5, 4}, - /*parameters=*/ - MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + VersionSpecificParameters(quic::QuicTimeDelta::Infinite(), + quic::QuicTimeDelta::FromMilliseconds(10000)), }; }; @@ -1768,7 +1758,7 @@ case MoqtMessageType::kClientSetup: return std::make_unique<ClientSetupMessage>(is_webtrans); case MoqtMessageType::kServerSetup: - return std::make_unique<ServerSetupMessage>(); + return std::make_unique<ServerSetupMessage>(is_webtrans); default: return nullptr; }
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index 87d17ca..e10ae33 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -69,10 +69,10 @@ "do not subscribe\n"; return std::nullopt; } - if (session_->SubscribeCurrentObject( - *track_name, &remote_track_visitor_, - MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)), - std::nullopt, std::nullopt, std::nullopt})) { + VersionSpecificParameters parameters( + std::string(GetUsername(my_track_name_))); + if (session_->SubscribeCurrentObject(*track_name, &remote_track_visitor_, + parameters)) { ++subscribes_to_make_; other_users_.emplace(*track_name); } @@ -230,7 +230,7 @@ std::cout << "Announcing " << GetUserNamespace(my_track_name_).ToString() << "\n"; session_->Announce(GetUserNamespace(my_track_name_), - std::move(announce_callback)); + std::move(announce_callback), VersionSpecificParameters()); // Send SUBSCRIBE_ANNOUNCE. Pop 3 levels of namespace to get to {moq-chat, // chat-id} @@ -248,10 +248,11 @@ << " accepted\n"; return; }; - session_->SubscribeAnnounces( - GetChatNamespace(my_track_name_), std::move(subscribe_announces_callback), - MoqtSubscribeParameters{std::string(GetUsername(my_track_name_)), - std::nullopt, std::nullopt, std::nullopt}); + VersionSpecificParameters parameters( + std::string(GetUsername(my_track_name_))); + session_->SubscribeAnnounces(GetChatNamespace(my_track_name_), + std::move(subscribe_announces_callback), + parameters); while (session_is_open_ && is_syncing()) { RunEventLoop();
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index 3fae683..46a4e89 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -49,8 +49,9 @@ return std::nullopt; } std::cout << "Received ANNOUNCE for " << track_namespace.ToString() << "\n"; - session_->SubscribeCurrentObject( - *track_name_, server_->remote_track_visitor(), MoqtSubscribeParameters()); + session_->SubscribeCurrentObject(*track_name_, + server_->remote_track_visitor(), + moqt::VersionSpecificParameters()); server_->AddUser(*track_name_); return std::nullopt; } @@ -113,7 +114,8 @@ GetUserNamespace(track_name), absl::bind_front(&ChatServer::ChatServerSessionHandler:: OnOutgoingAnnounceReply, - this)); + this), + moqt::VersionSpecificParameters()); } return std::optional<MoqtSubscribeErrorReason>(); };
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h index 6e0dd7e..0cc404e 100644 --- a/quiche/quic/moqt/tools/chat_server.h +++ b/quiche/quic/moqt/tools/chat_server.h
@@ -71,7 +71,8 @@ track_namespace, absl::bind_front(&ChatServer::ChatServerSessionHandler:: OnOutgoingAnnounceReply, - this)); + this), + VersionSpecificParameters()); return; } }
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index 85de291..8a2a6c3 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -155,7 +155,7 @@ FullTrackName full_track_name = track_namespace; full_track_name.AddElement(track); session_->JoiningFetch(full_track_name, &it->second, 0, - MoqtSubscribeParameters()); + VersionSpecificParameters()); } return std::nullopt;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc index 59f8ee9..a2f3b67 100644 --- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc +++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -442,7 +442,7 @@ // server does not yet have an active subscription, so the client has // some catching up to do. generator_.Start(); - MoqtSubscribeParameters subscription_parameters; + VersionSpecificParameters subscription_parameters; if (!parameters_.delivery_timeout.IsInfinite()) { subscription_parameters.delivery_timeout = parameters_.delivery_timeout; }