Update existing messages for draft-06. This is mostly turning track_namespace into a tuple. Also generalizes Subscribe parameters. The substantive stuff is in moqt_messages, moqt_framer, and moqt_parser. The rest is just fixing compiler problems, since FullTrackName is visible to applications. New draft-06 messages will follow in another CL, although the codepoints are here. PiperOrigin-RevId: 680678558
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index c4fe1e1..6974ca6 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -112,25 +112,133 @@ const IntParameter& parameter_; }; +class WireSubscribeParameterList { + public: + explicit WireSubscribeParameterList(const MoqtSubscribeParameters& list) + : list_(list) {} + + size_t GetLengthOnWire() { + uint64_t num_params = 0; + size_t length = 0; + if (list_.authorization_info.has_value()) { + ++num_params; + length += + WireStringParameter( + StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, + *list_.authorization_info)) + .GetLengthOnWire(); + } + if (list_.delivery_timeout.has_value()) { + ++num_params; + length += WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kDeliveryTimeout, + static_cast<uint64_t>( + list_.delivery_timeout->ToMilliseconds()))) + .GetLengthOnWire(); + } + if (list_.max_cache_duration.has_value()) { + ++num_params; + length += + WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kMaxCacheDuration, + static_cast<uint64_t>( + list_.max_cache_duration->ToMilliseconds()))) + .GetLengthOnWire(); + } + if (list_.object_ack_window.has_value()) { + ++num_params; + length += + WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kOackWindowSize, + static_cast<uint64_t>( + list_.object_ack_window->ToMicroseconds()))) + .GetLengthOnWire(); + } + length += WireVarInt62(num_params).GetLengthOnWire(); + return length; + } + absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) { + uint64_t num_params = (list_.authorization_info.has_value() ? 1 : 0) + + (list_.delivery_timeout.has_value() ? 1 : 0) + + (list_.max_cache_duration.has_value() ? 1 : 0) + + (list_.object_ack_window.has_value() ? 1 : 0); + if (!writer.WriteVarInt62(num_params)) { + return absl::InternalError("Failed to serialize the length prefix"); + } + if (list_.authorization_info.has_value() && + WireStringParameter( + StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, + *list_.authorization_info)) + .SerializeIntoWriter(writer) != absl::OkStatus()) { + return absl::InternalError("Failed to serialize the authorization info"); + } + if (list_.delivery_timeout.has_value() && + WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kDeliveryTimeout, + static_cast<uint64_t>( + list_.delivery_timeout->ToMilliseconds()))) + .SerializeIntoWriter(writer) != absl::OkStatus()) { + return absl::InternalError("Failed to serialize the delivery timeout"); + } + if (list_.max_cache_duration.has_value() && + WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kMaxCacheDuration, + static_cast<uint64_t>( + list_.max_cache_duration->ToMilliseconds()))) + .SerializeIntoWriter(writer) != absl::OkStatus()) { + return absl::InternalError("Failed to serialize the max cache duration"); + } + if (list_.object_ack_window.has_value() && + WireIntParameter( + IntParameter(MoqtTrackRequestParameter::kOackWindowSize, + static_cast<uint64_t>( + list_.object_ack_window->ToMicroseconds()))) + .SerializeIntoWriter(writer) != absl::OkStatus()) { + return absl::InternalError("Failed to serialize the oack window size"); + } + return absl::OkStatus(); + } + + private: + const MoqtSubscribeParameters& list_; +}; + class WireFullTrackName { public: using DataType = FullTrackName; - explicit WireFullTrackName(const FullTrackName& name) : name_(name) {} + // If |includes_name| is true, the last element in the tuple is the track + // name and is therefore not counted in the prefix of the namespace tuple. + WireFullTrackName(const FullTrackName& name, bool includes_name) + : name_(name), includes_name_(includes_name) {} size_t GetLengthOnWire() { - return quiche::ComputeLengthOnWire( - WireStringWithVarInt62Length(name_.track_namespace()), - WireStringWithVarInt62Length(name_.track_name())); + const auto tuple = name_.tuple(); + size_t num_elements = includes_name_ ? (tuple.size() - 1) : tuple.size(); + size_t length = WireVarInt62(num_elements).GetLengthOnWire(); + for (const auto& element : tuple) { + length += WireStringWithVarInt62Length(element).GetLengthOnWire(); + } + return length; } absl::Status SerializeIntoWriter(quiche::QuicheDataWriter& writer) { - return quiche::SerializeIntoWriter( - writer, WireStringWithVarInt62Length(name_.track_namespace()), - WireStringWithVarInt62Length(name_.track_name())); + const auto tuple = name_.tuple(); + size_t num_elements = includes_name_ ? (tuple.size() - 1) : tuple.size(); + if (!writer.WriteVarInt62(num_elements)) { + return absl::InternalError("Failed to serialize the length prefix"); + } + for (const auto& element : tuple) { + if (WireStringWithVarInt62Length(element).SerializeIntoWriter(writer) != + absl::OkStatus()) { + return absl::InternalError("Failed to serialize the element"); + } + } + return absl::OkStatus(); } private: const FullTrackName& name_; + const bool includes_name_; }; // Serializes data into buffer using the default allocator. Invokes QUICHE_BUG @@ -333,58 +441,38 @@ QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range"; return quiche::QuicheBuffer(); } - absl::InlinedVector<StringParameter, 1> string_params; - if (message.parameters.authorization_info.has_value()) { - string_params.push_back( - StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, - *message.parameters.authorization_info)); - } - absl::InlinedVector<IntParameter, 1> int_params; - if (message.parameters.object_ack_window.has_value()) { - QUICHE_DCHECK(message.parameters.object_ack_window->ToMicroseconds() >= 0); - int_params.push_back(IntParameter( - MoqtTrackRequestParameter::kOackWindowSize, - static_cast<uint64_t>( - message.parameters.object_ack_window->ToMicroseconds()))); - } switch (filter_type) { case MoqtFilterType::kLatestGroup: case MoqtFilterType::kLatestObject: return Serialize( WireVarInt62(MoqtMessageType::kSubscribe), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), - WireFullTrackName(message.full_track_name), + WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), - WireVarInt62(string_params.size() + int_params.size()), - WireSpan<WireStringParameter>(string_params), - WireSpan<WireIntParameter>(int_params)); + WireSubscribeParameterList(message.parameters)); case MoqtFilterType::kAbsoluteStart: return Serialize( WireVarInt62(MoqtMessageType::kSubscribe), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), - WireFullTrackName(message.full_track_name), + WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), WireVarInt62(*message.start_group), WireVarInt62(*message.start_object), - WireVarInt62(string_params.size() + int_params.size()), - WireSpan<WireStringParameter>(string_params), - WireSpan<WireIntParameter>(int_params)); + WireSubscribeParameterList(message.parameters)); case MoqtFilterType::kAbsoluteRange: return Serialize( WireVarInt62(MoqtMessageType::kSubscribe), WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias), - WireFullTrackName(message.full_track_name), + WireFullTrackName(message.full_track_name, true), WireUint8(message.subscriber_priority), WireDeliveryOrder(message.group_order), WireVarInt62(filter_type), WireVarInt62(*message.start_group), WireVarInt62(*message.start_object), WireVarInt62(*message.end_group), WireVarInt62(message.end_object.has_value() ? *message.end_object + 1 : 0), - WireVarInt62(string_params.size() + int_params.size()), - WireSpan<WireStringParameter>(string_params), - WireSpan<WireIntParameter>(int_params)); + WireSubscribeParameterList(message.parameters)); default: QUICHE_BUG(MoqtFramer_end_group_missing) << "Subscribe framing error."; return quiche::QuicheBuffer(); @@ -393,18 +481,24 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeOk( const MoqtSubscribeOk& message) { + if (message.parameters.authorization_info.has_value()) { + QUICHE_BUG(MoqtFramer_invalid_subscribe_ok) + << "SUBSCRIBE_OK with delivery timeout"; + } if (message.largest_id.has_value()) { return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk), WireVarInt62(message.subscribe_id), WireVarInt62(message.expires.ToMilliseconds()), WireDeliveryOrder(message.group_order), WireUint8(1), WireVarInt62(message.largest_id->group), - WireVarInt62(message.largest_id->object)); + WireVarInt62(message.largest_id->object), + WireSubscribeParameterList(message.parameters)); } return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk), WireVarInt62(message.subscribe_id), WireVarInt62(message.expires.ToMilliseconds()), - WireDeliveryOrder(message.group_order), WireUint8(0)); + WireDeliveryOrder(message.group_order), WireUint8(0), + WireSubscribeParameterList(message.parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribeError( @@ -440,6 +534,10 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate( const MoqtSubscribeUpdate& message) { + if (message.parameters.authorization_info.has_value()) { + QUICHE_BUG(MoqtFramer_invalid_subscribe_update) + << "SUBSCRIBE_UPDATE with authorization info"; + } uint64_t end_group = message.end_group.has_value() ? *message.end_group + 1 : 0; uint64_t end_object = @@ -448,45 +546,35 @@ QUICHE_BUG(MoqtFramer_invalid_subscribe_update) << "Invalid object range"; return quiche::QuicheBuffer(); } - absl::InlinedVector<StringParameter, 1> string_params; - if (message.authorization_info.has_value()) { - string_params.push_back( - StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, - *message.authorization_info)); - } return Serialize( WireVarInt62(MoqtMessageType::kSubscribeUpdate), WireVarInt62(message.subscribe_id), WireVarInt62(message.start_group), WireVarInt62(message.start_object), WireVarInt62(end_group), WireVarInt62(end_object), WireUint8(message.subscriber_priority), - WireSpan<WireStringParameter>(string_params)); + WireSubscribeParameterList(message.parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounce( const MoqtAnnounce& message) { - absl::InlinedVector<StringParameter, 1> string_params; - if (message.authorization_info.has_value()) { - string_params.push_back( - StringParameter(MoqtTrackRequestParameter::kAuthorizationInfo, - *message.authorization_info)); + if (message.parameters.delivery_timeout.has_value()) { + QUICHE_BUG(MoqtFramer_invalid_announce) << "ANNOUNCE with delivery timeout"; } return Serialize( WireVarInt62(static_cast<uint64_t>(MoqtMessageType::kAnnounce)), - WireStringWithVarInt62Length(message.track_namespace), - WireVarInt62(string_params.size()), - WireSpan<WireStringParameter>(string_params)); + WireFullTrackName(message.track_namespace, false), + WireSubscribeParameterList(message.parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounceOk( const MoqtAnnounceOk& message) { return Serialize(WireVarInt62(MoqtMessageType::kAnnounceOk), - WireStringWithVarInt62Length(message.track_namespace)); + WireFullTrackName(message.track_namespace, false)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounceError( const MoqtAnnounceError& message) { return Serialize(WireVarInt62(MoqtMessageType::kAnnounceError), - WireStringWithVarInt62Length(message.track_namespace), + WireFullTrackName(message.track_namespace, false), WireVarInt62(message.error_code), WireStringWithVarInt62Length(message.reason_phrase)); } @@ -494,25 +582,27 @@ quiche::QuicheBuffer MoqtFramer::SerializeAnnounceCancel( const MoqtAnnounceCancel& message) { return Serialize(WireVarInt62(MoqtMessageType::kAnnounceCancel), - WireStringWithVarInt62Length(message.track_namespace)); + WireFullTrackName(message.track_namespace, false), + WireVarInt62(message.error_code), + WireStringWithVarInt62Length(message.reason_phrase)); } quiche::QuicheBuffer MoqtFramer::SerializeTrackStatusRequest( const MoqtTrackStatusRequest& message) { return Serialize(WireVarInt62(MoqtMessageType::kTrackStatusRequest), - WireFullTrackName(message.full_track_name)); + WireFullTrackName(message.full_track_name, true)); } quiche::QuicheBuffer MoqtFramer::SerializeUnannounce( const MoqtUnannounce& message) { return Serialize(WireVarInt62(MoqtMessageType::kUnannounce), - WireStringWithVarInt62Length(message.track_namespace)); + WireFullTrackName(message.track_namespace, false)); } quiche::QuicheBuffer MoqtFramer::SerializeTrackStatus( const MoqtTrackStatus& message) { return Serialize(WireVarInt62(MoqtMessageType::kTrackStatus), - WireFullTrackName(message.full_track_name), + WireFullTrackName(message.full_track_name, true), WireVarInt62(message.status_code), WireVarInt62(message.last_group), WireVarInt62(message.last_object));
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 55f6944..510dd17 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -309,7 +309,8 @@ start_object, end_group, end_object, - MoqtSubscribeParameters{"bar"}, + MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, + std::nullopt}, }; quiche::QuicheBuffer buffer; MoqtFilterType expected_filter_type = MoqtFilterType::kNone; @@ -335,7 +336,7 @@ } buffer = framer_.SerializeSubscribe(subscribe); // Go to the filter type. - const uint8_t* read = BufferAtOffset(buffer, 14); + const uint8_t* read = BufferAtOffset(buffer, 15); EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type); EXPECT_GT(buffer.size(), 0); if (expected_filter_type == MoqtFilterType::kAbsoluteRange && @@ -360,7 +361,7 @@ /*start_object=*/std::optional<uint64_t>(3), /*end_group=*/std::optional<uint64_t>(3), /*end_object=*/std::nullopt, - MoqtSubscribeParameters{"bar"}, + MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt}, }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe), @@ -384,7 +385,7 @@ /*start_object=*/std::optional<uint64_t>(3), /*end_group=*/std::nullopt, /*end_object=*/std::nullopt, - MoqtSubscribeParameters{"bar"}, + MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt}, }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe), @@ -400,7 +401,8 @@ /*end_group=*/4, /*end_object=*/std::nullopt, /*subscriber_priority=*/0xaa, - /*authorization_info=*/"bar", + MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt, + std::nullopt}, }; quiche::QuicheBuffer buffer; buffer = framer_.SerializeSubscribeUpdate(subscribe_update); @@ -419,7 +421,8 @@ /*end_group=*/4, /*end_object=*/6, /*subscriber_priority=*/0xaa, - /*authorization_info=*/"bar", + MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt, + std::nullopt}, }; quiche::QuicheBuffer buffer; buffer = framer_.SerializeSubscribeUpdate(subscribe_update); @@ -438,7 +441,8 @@ /*end_group=*/std::nullopt, /*end_object=*/6, /*subscriber_priority=*/0xaa, - /*authorization_info=*/"bar", + MoqtSubscribeParameters{std::nullopt, std::nullopt, std::nullopt, + std::nullopt}, }; quiche::QuicheBuffer buffer; EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribeUpdate(subscribe_update),
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index f7578cd..11f6be4 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -122,19 +122,21 @@ TEST_F(MoqtIntegrationTest, AnnounceSuccess) { EstablishSession(); - EXPECT_CALL(server_callbacks_.incoming_announce_callback, Call("foo")) + EXPECT_CALL(server_callbacks_.incoming_announce_callback, + Call(FullTrackName{"foo"})) .WillOnce(Return(std::nullopt)); testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; - client_->session()->Announce("foo", announce_callback.AsStdFunction()); + client_->session()->Announce(FullTrackName{"foo"}, + announce_callback.AsStdFunction()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) - .WillOnce([&](absl::string_view track_namespace, + .WillOnce([&](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error) { matches = true; - EXPECT_EQ(track_namespace, "foo"); + EXPECT_EQ(track_namespace, FullTrackName{"foo"}); EXPECT_FALSE(error.has_value()); }); bool success = @@ -144,22 +146,25 @@ TEST_F(MoqtIntegrationTest, AnnounceSuccessSubscribeInResponse) { EstablishSession(); - EXPECT_CALL(server_callbacks_.incoming_announce_callback, Call("foo")) + EXPECT_CALL(server_callbacks_.incoming_announce_callback, + Call(FullTrackName{"foo"})) .WillOnce(Return(std::nullopt)); MockRemoteTrackVisitor server_visitor; testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; - client_->session()->Announce("foo", announce_callback.AsStdFunction()); + client_->session()->Announce(FullTrackName{"foo"}, + announce_callback.AsStdFunction()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) - .WillOnce([&](absl::string_view track_namespace, + .WillOnce([&](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error) { - EXPECT_EQ(track_namespace, "foo"); + EXPECT_EQ(track_namespace, FullTrackName{"foo"}); + FullTrackName track_name = track_namespace; + track_name.AddElement("/catalog"); EXPECT_FALSE(error.has_value()); - server_->session()->SubscribeCurrentGroup( - FullTrackName(track_namespace, "/catalog"), &server_visitor); + server_->session()->SubscribeCurrentGroup(track_name, &server_visitor); }); EXPECT_CALL(server_visitor, OnReply(_, _)).WillOnce([&]() { matches = true; @@ -176,10 +181,11 @@ // it receives. MockRemoteTrackVisitor server_visitor; EXPECT_CALL(server_callbacks_.incoming_announce_callback, Call(_)) - .WillOnce([&](absl::string_view track_namespace) { + .WillOnce([&](FullTrackName track_namespace) { + FullTrackName track_name = track_namespace; + track_name.AddElement("data"); server_->session()->SubscribeAbsolute( - FullTrackName(track_namespace, "data"), /*start_group=*/0, - /*start_object=*/0, &server_visitor); + track_name, /*start_group=*/0, /*start_object=*/0, &server_visitor); return std::optional<MoqtAnnounceErrorReason>(); }); @@ -194,7 +200,8 @@ received_subscribe_ok = true; }); client_->session()->Announce( - "test", [](absl::string_view, std::optional<MoqtAnnounceErrorReason>) {}); + FullTrackName{"test"}, + [](FullTrackName, std::optional<MoqtAnnounceErrorReason>) {}); bool received_object = false; EXPECT_CALL(server_visitor, OnObjectFragment(_, _, _, _, _, _, _, _)) @@ -335,16 +342,17 @@ TEST_F(MoqtIntegrationTest, AnnounceFailure) { EstablishSession(); testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_callback; - client_->session()->Announce("foo", announce_callback.AsStdFunction()); + client_->session()->Announce(FullTrackName{"foo"}, + announce_callback.AsStdFunction()); bool matches = false; EXPECT_CALL(announce_callback, Call(_, _)) - .WillOnce([&](absl::string_view track_namespace, + .WillOnce([&](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error) { matches = true; - EXPECT_EQ(track_namespace, "foo"); + EXPECT_EQ(track_namespace, FullTrackName{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, MoqtAnnounceErrorCode::kAnnounceNotSupported);
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 26acbac..5c2e503 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -95,6 +95,14 @@ return "UNANNOUNCE"; case MoqtMessageType::kGoAway: return "GOAWAY"; + case MoqtMessageType::kSubscribeNamespace: + return "SUBSCRIBE_NAMESPACE"; + case MoqtMessageType::kSubscribeNamespaceOk: + return "SUBSCRIBE_NAMESPACE_OK"; + case MoqtMessageType::kSubscribeNamespaceError: + return "SUBSCRIBE_NAMESPACE_ERROR"; + case MoqtMessageType::kUnsubscribeNamespace: + return "UNSUBSCRIBE_NAMESPACE"; case MoqtMessageType::kMaxSubscribeId: return "MAX_SUBSCRIBE_ID"; case MoqtMessageType::kObjectAck: @@ -190,16 +198,6 @@ return absl::c_lexicographical_compare(tuple_, other.tuple_); } FullTrackName::FullTrackName(absl::Span<const absl::string_view> elements) - : tuple_(elements.begin(), elements.end()) { - if (tuple_.size() < 2) { - QUICHE_BUG(FullTrackName_too_short) - << "Full track name should be at least two elements long"; - // Failsafe. - while (tuple_.size() < 2) { - tuple_.push_back(""); - } - return; - } -} + : tuple_(elements.begin(), elements.end()) {} } // namespace moqt
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index c522f01..4a08832 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -89,6 +89,10 @@ kTrackStatusRequest = 0x0d, kTrackStatus = 0x0e, kGoAway = 0x10, + kSubscribeNamespace = 0x11, + kSubscribeNamespaceOk = 0x12, + kSubscribeNamespaceError = 0x13, + kUnsubscribeNamespace = 0x14, kMaxSubscribeId = 0x15, kClientSetup = 0x40, kServerSetup = 0x41, @@ -137,6 +141,8 @@ enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t { kAuthorizationInfo = 0x2, + kDeliveryTimeout = 0x3, + kMaxCacheDuration = 0x4, // QUICHE-specific extensions. kOackWindowSize = 0xbbf1439, @@ -154,10 +160,9 @@ std::string reason_phrase; }; -// Full track name represents a tuple of the track namespace and the the track -// name. (TODO) After draft-06, multiple elements in track namespace will be -// supported; if https://github.com/moq-wg/moq-transport/issues/508 goes -// through, the distinction between different parts will disappear. +// Full track name represents a tuple of name elements. All higher order +// elements MUST be present, but lower-order ones (like the name) can be +// omitted. class FullTrackName { public: explicit FullTrackName(absl::Span<const absl::string_view> elements); @@ -167,15 +172,30 @@ std::data(elements), std::size(elements))) {} explicit FullTrackName(absl::string_view ns, absl::string_view name) : FullTrackName({ns, name}) {} - FullTrackName() : FullTrackName({"", ""}) {} + FullTrackName() : FullTrackName({}) {} std::string ToString() const; - absl::string_view track_namespace() const { - // TODO: turn into a tuple for draft-06. - return tuple_[0]; + void AddElement(absl::string_view element) { + tuple_.push_back(std::string(element)); } - absl::string_view track_name() const { return tuple_[tuple_.size() - 1]; } + // Remove the last element to convert a name to a namespace. + void NameToNamespace() { tuple_.pop_back(); } + // returns true is |this| is a subdomain of |other|. + bool InNamespace(const FullTrackName& other) const { + if (tuple_.size() < other.tuple_.size()) { + return false; + } + for (int i = 0; i < other.tuple_.size(); ++i) { + if (tuple_[i] != other.tuple_[i]) { + return false; + } + } + return true; + } + absl::Span<const std::string> tuple() const { + return absl::MakeConstSpan(tuple_); + } bool operator==(const FullTrackName& other) const; bool operator<(const FullTrackName& other) const; @@ -190,6 +210,8 @@ sink.Append(track_name.ToString()); } + bool empty() const { return tuple_.empty(); } + private: absl::InlinedVector<std::string, 2> tuple_; }; @@ -288,12 +310,21 @@ struct QUICHE_EXPORT MoqtSubscribeParameters { std::optional<std::string> authorization_info; + std::optional<quic::QuicTimeDelta> delivery_timeout; + std::optional<quic::QuicTimeDelta> max_cache_duration; // If present, indicates that OBJECT_ACK messages will be sent in response to // the objects on the stream. The actual value is informational, and it // communicates how many frames the subscriber is willing to buffer, in // microseconds. std::optional<quic::QuicTimeDelta> object_ack_window; + + bool operator==(const MoqtSubscribeParameters& other) const { + return authorization_info == other.authorization_info && + delivery_timeout == other.delivery_timeout && + max_cache_duration == other.max_cache_duration && + object_ack_window == other.object_ack_window; + } }; struct QUICHE_EXPORT MoqtSubscribe { @@ -331,12 +362,16 @@ MoqtDeliveryOrder group_order; // If ContextExists on the wire is zero, largest_id has no value. std::optional<FullSequence> largest_id; + MoqtSubscribeParameters parameters; }; enum class QUICHE_EXPORT SubscribeErrorCode : uint64_t { kInternalError = 0x0, kInvalidRange = 0x1, kRetryTrackAlias = 0x2, + kTrackDoesNotExist = 0x3, + kUnauthorized = 0x4, + kTimeout = 0x5, }; struct QUICHE_EXPORT MoqtSubscribeError { @@ -374,26 +409,26 @@ std::optional<uint64_t> end_group; std::optional<uint64_t> end_object; MoqtPriority subscriber_priority; - std::optional<std::string> authorization_info; + MoqtSubscribeParameters parameters; }; struct QUICHE_EXPORT MoqtAnnounce { - std::string track_namespace; - std::optional<std::string> authorization_info; + FullTrackName track_namespace; + MoqtSubscribeParameters parameters; }; struct QUICHE_EXPORT MoqtAnnounceOk { - std::string track_namespace; + FullTrackName track_namespace; }; struct QUICHE_EXPORT MoqtAnnounceError { - std::string track_namespace; + FullTrackName track_namespace; MoqtAnnounceErrorCode error_code; std::string reason_phrase; }; struct QUICHE_EXPORT MoqtUnannounce { - std::string track_namespace; + FullTrackName track_namespace; }; enum class QUICHE_EXPORT MoqtTrackStatusCode : uint64_t { @@ -425,7 +460,10 @@ }; struct QUICHE_EXPORT MoqtAnnounceCancel { - std::string track_namespace; + FullTrackName track_namespace; + // TODO: What namespace is this error code in? + uint64_t error_code; + std::string reason_phrase; }; struct QUICHE_EXPORT MoqtTrackStatusRequest {
diff --git a/quiche/quic/moqt/moqt_messages_test.cc b/quiche/quic/moqt/moqt_messages_test.cc index bec3c4b..0b554f2 100644 --- a/quiche/quic/moqt/moqt_messages_test.cc +++ b/quiche/quic/moqt/moqt_messages_test.cc
@@ -30,6 +30,16 @@ EXPECT_LT(name1, name3); } +TEST(MoqtMessagesTest, FullTrackNameInNamespace) { + FullTrackName name1({"a", "b"}); + FullTrackName name2({"a", "b", "c"}); + FullTrackName name3({"d", "b"}); + EXPECT_TRUE(name2.InNamespace(name1)); + EXPECT_FALSE(name1.InNamespace(name2)); + EXPECT_TRUE(name1.InNamespace(name1)); + EXPECT_FALSE(name2.InNamespace(name3)); +} + TEST(MoqtMessagesTest, FullTrackNameToString) { FullTrackName name1({"a", "b"}); EXPECT_EQ(name1.ToString(), R"({"a", "b"})");
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index a00416b..541c550 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -8,6 +8,7 @@ #include <cstddef> #include <cstdint> #include <cstring> +#include <initializer_list> #include <optional> #include <string> @@ -25,16 +26,6 @@ namespace { -bool ReadFullTrackName(quic::QuicDataReader& reader, FullTrackName& out) { - absl::string_view track_namespace, track_name; - if (!reader.ReadStringPieceVarInt62(&track_namespace) || - !reader.ReadStringPieceVarInt62(&track_name)) { - return false; - } - out = FullTrackName({track_namespace, track_name}); - return true; -} - bool ParseDeliveryOrder(uint8_t raw_value, std::optional<MoqtDeliveryOrder>& output) { switch (raw_value) { @@ -238,8 +229,9 @@ return ProcessMaxSubscribeId(reader); case moqt::MoqtMessageType::kObjectAck: return ProcessObjectAck(reader); + default: + ParseError("Unknown message type"); } - ParseError("Unknown message type"); return 0; } @@ -408,13 +400,16 @@ MoqtSubscribe subscribe_request; uint64_t filter, group, object; uint8_t group_order; + absl::string_view track_name; if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) || !reader.ReadVarInt62(&subscribe_request.track_alias) || - !ReadFullTrackName(reader, subscribe_request.full_track_name) || + !ReadTrackNamespace(reader, subscribe_request.full_track_name) || + !reader.ReadStringPieceVarInt62(&track_name) || !reader.ReadUInt8(&subscribe_request.subscriber_priority) || !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) { return 0; } + subscribe_request.full_track_name.AddElement(track_name); if (!ParseDeliveryOrder(group_order, subscribe_request.group_order)) { ParseError("Invalid group order value in SUBSCRIBE message"); return 0; @@ -459,46 +454,9 @@ ParseError("Invalid filter type"); return 0; } - uint64_t num_params; - if (!reader.ReadVarInt62(&num_params)) { + if (!ReadSubscribeParameters(reader, subscribe_request.parameters)) { return 0; } - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return 0; - } - auto key = static_cast<MoqtTrackRequestParameter>(type); - switch (key) { - case MoqtTrackRequestParameter::kAuthorizationInfo: - if (subscribe_request.parameters.authorization_info.has_value()) { - ParseError( - "AUTHORIZATION_INFO parameter appears twice in " - "SUBSCRIBE"); - return 0; - } - subscribe_request.parameters.authorization_info = value; - break; - case MoqtTrackRequestParameter::kOackWindowSize: { - if (subscribe_request.parameters.object_ack_window.has_value()) { - ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE"); - return 0; - } - uint64_t raw_value; - if (!StringViewToVarInt(value, raw_value)) { - ParseError("OACK_WINDOW_SIZE parameter is not a valid varint"); - return 0; - } - subscribe_request.parameters.object_ack_window = - quic::QuicTimeDelta::FromMicroseconds(raw_value); - break; - } - default: - // Skip over the parameter. - break; - } - } visitor_.OnSubscribeMessage(subscribe_request); return reader.PreviouslyReadPayload().length(); } @@ -530,6 +488,13 @@ return 0; } } + if (!ReadSubscribeParameters(reader, subscribe_ok.parameters)) { + return 0; + } + if (subscribe_ok.parameters.authorization_info.has_value()) { + ParseError("SUBSCRIBE_OK has authorization info"); + return 0; + } visitor_.OnSubscribeOkMessage(subscribe_ok); return reader.PreviouslyReadPayload().length(); } @@ -585,13 +550,15 @@ size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) { MoqtSubscribeUpdate subscribe_update; - uint64_t end_group, end_object, num_params; + uint64_t end_group, end_object; if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) || !reader.ReadVarInt62(&subscribe_update.start_group) || !reader.ReadVarInt62(&subscribe_update.start_object) || !reader.ReadVarInt62(&end_group) || !reader.ReadVarInt62(&end_object) || - !reader.ReadUInt8(&subscribe_update.subscriber_priority) || - !reader.ReadVarInt62(&num_params)) { + !reader.ReadUInt8(&subscribe_update.subscriber_priority)) { + return 0; + } + if (!ReadSubscribeParameters(reader, subscribe_update.parameters)) { return 0; } if (end_group == 0) { @@ -618,27 +585,9 @@ } else { subscribe_update.end_object = std::nullopt; } - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return 0; - } - auto key = static_cast<MoqtTrackRequestParameter>(type); - switch (key) { - case MoqtTrackRequestParameter::kAuthorizationInfo: - if (subscribe_update.authorization_info.has_value()) { - ParseError( - "AUTHORIZATION_INFO parameter appears twice in " - "SUBSCRIBE_UPDATE"); - return 0; - } - subscribe_update.authorization_info = value; - break; - default: - // Skip over the parameter. - break; - } + if (subscribe_update.parameters.authorization_info.has_value()) { + ParseError("SUBSCRIBE_UPDATE has authorization info"); + return 0; } visitor_.OnSubscribeUpdateMessage(subscribe_update); return reader.PreviouslyReadPayload().length(); @@ -646,32 +595,15 @@ size_t MoqtControlParser::ProcessAnnounce(quic::QuicDataReader& reader) { MoqtAnnounce announce; - if (!reader.ReadStringVarInt62(announce.track_namespace)) { + if (!ReadTrackNamespace(reader, announce.track_namespace)) { return 0; } - uint64_t num_params; - if (!reader.ReadVarInt62(&num_params)) { + if (!ReadSubscribeParameters(reader, announce.parameters)) { return 0; } - for (uint64_t i = 0; i < num_params; ++i) { - uint64_t type; - absl::string_view value; - if (!ReadParameter(reader, type, value)) { - return 0; - } - auto key = static_cast<MoqtTrackRequestParameter>(type); - switch (key) { - case MoqtTrackRequestParameter::kAuthorizationInfo: - if (announce.authorization_info.has_value()) { - ParseError("AUTHORIZATION_INFO parameter appears twice in ANNOUNCE"); - return 0; - } - announce.authorization_info = value; - break; - default: - // Skip over the parameter. - break; - } + if (announce.parameters.delivery_timeout.has_value()) { + ParseError("ANNOUNCE has delivery timeout"); + return 0; } visitor_.OnAnnounceMessage(announce); return reader.PreviouslyReadPayload().length(); @@ -679,7 +611,7 @@ size_t MoqtControlParser::ProcessAnnounceOk(quic::QuicDataReader& reader) { MoqtAnnounceOk announce_ok; - if (!reader.ReadStringVarInt62(announce_ok.track_namespace)) { + if (!ReadTrackNamespace(reader, announce_ok.track_namespace)) { return 0; } visitor_.OnAnnounceOkMessage(announce_ok); @@ -688,7 +620,7 @@ size_t MoqtControlParser::ProcessAnnounceError(quic::QuicDataReader& reader) { MoqtAnnounceError announce_error; - if (!reader.ReadStringVarInt62(announce_error.track_namespace)) { + if (!ReadTrackNamespace(reader, announce_error.track_namespace)) { return 0; } uint64_t error_code; @@ -705,7 +637,11 @@ size_t MoqtControlParser::ProcessAnnounceCancel(quic::QuicDataReader& reader) { MoqtAnnounceCancel announce_cancel; - if (!reader.ReadStringVarInt62(announce_cancel.track_namespace)) { + if (!ReadTrackNamespace(reader, announce_cancel.track_namespace)) { + return 0; + } + if (!reader.ReadVarInt62(&announce_cancel.error_code) || + !reader.ReadStringVarInt62(announce_cancel.reason_phrase)) { return 0; } visitor_.OnAnnounceCancelMessage(announce_cancel); @@ -715,16 +651,21 @@ size_t MoqtControlParser::ProcessTrackStatusRequest( quic::QuicDataReader& reader) { MoqtTrackStatusRequest track_status_request; - if (!ReadFullTrackName(reader, track_status_request.full_track_name)) { + if (!ReadTrackNamespace(reader, track_status_request.full_track_name)) { return 0; } + absl::string_view name; + if (!reader.ReadStringPieceVarInt62(&name)) { + return 0; + } + track_status_request.full_track_name.AddElement(name); visitor_.OnTrackStatusRequestMessage(track_status_request); return reader.PreviouslyReadPayload().length(); } size_t MoqtControlParser::ProcessUnannounce(quic::QuicDataReader& reader) { MoqtUnannounce unannounce; - if (!reader.ReadStringVarInt62(unannounce.track_namespace)) { + if (!ReadTrackNamespace(reader, unannounce.track_namespace)) { return 0; } visitor_.OnUnannounceMessage(unannounce); @@ -733,9 +674,16 @@ size_t MoqtControlParser::ProcessTrackStatus(quic::QuicDataReader& reader) { MoqtTrackStatus track_status; + if (!ReadTrackNamespace(reader, track_status.full_track_name)) { + return 0; + } + absl::string_view name; + if (!reader.ReadStringPieceVarInt62(&name)) { + return 0; + } + track_status.full_track_name.AddElement(name); uint64_t value; - if (!ReadFullTrackName(reader, track_status.full_track_name) || - !reader.ReadVarInt62(&value) || + if (!reader.ReadVarInt62(&value) || !reader.ReadVarInt62(&track_status.last_group) || !reader.ReadVarInt62(&track_status.last_object)) { return 0; @@ -818,6 +766,71 @@ return reader.ReadStringPieceVarInt62(&value); } +bool MoqtControlParser::ReadSubscribeParameters( + quic::QuicDataReader& reader, MoqtSubscribeParameters& params) { + uint64_t num_params; + if (!reader.ReadVarInt62(&num_params)) { + return false; + } + for (uint64_t i = 0; i < num_params; ++i) { + uint64_t type; + absl::string_view value; + if (!ReadParameter(reader, type, value)) { + return false; + } + uint64_t raw_value; + auto key = static_cast<MoqtTrackRequestParameter>(type); + switch (key) { + case MoqtTrackRequestParameter::kAuthorizationInfo: + if (params.authorization_info.has_value()) { + ParseError("AUTHORIZATION_INFO parameter appears twice"); + return false; + } + params.authorization_info = value; + break; + case moqt::MoqtTrackRequestParameter::kDeliveryTimeout: + if (params.delivery_timeout.has_value()) { + ParseError("DELIVERY_TIMEOUT parameter appears twice"); + return false; + } + if (!StringViewToVarInt(value, raw_value)) { + return false; + } + params.delivery_timeout = + quic::QuicTimeDelta::FromMilliseconds(raw_value); + break; + case moqt::MoqtTrackRequestParameter::kMaxCacheDuration: + if (params.max_cache_duration.has_value()) { + ParseError("MAX_CACHE_DURATION parameter appears twice"); + return false; + } + if (!StringViewToVarInt(value, raw_value)) { + return false; + } + params.max_cache_duration = + quic::QuicTimeDelta::FromMilliseconds(raw_value); + break; + case MoqtTrackRequestParameter::kOackWindowSize: { + if (params.object_ack_window.has_value()) { + ParseError("OACK_WINDOW_SIZE parameter appears twice in SUBSCRIBE"); + return false; + } + if (!StringViewToVarInt(value, raw_value)) { + ParseError("OACK_WINDOW_SIZE parameter is not a valid varint"); + return false; + } + params.object_ack_window = + quic::QuicTimeDelta::FromMicroseconds(raw_value); + break; + } + default: + // Skip over the parameter. + break; + } + } + return true; +} + bool MoqtControlParser::StringViewToVarInt(absl::string_view& sv, uint64_t& vi) { quic::QuicDataReader reader(sv); @@ -830,6 +843,23 @@ return true; } +bool MoqtControlParser::ReadTrackNamespace(quic::QuicDataReader& reader, + FullTrackName& full_track_name) { + QUICHE_DCHECK(full_track_name.empty()); + uint64_t num_elements; + if (!reader.ReadVarInt62(&num_elements)) { + return 0; + } + for (uint64_t i = 0; i < num_elements; ++i) { + absl::string_view element; + if (!reader.ReadStringPieceVarInt62(&element)) { + return false; + } + full_track_name.AddElement(element); + } + return true; +} + void MoqtDataParser::ParseError(absl::string_view reason) { if (parsing_error_) { return; // Don't send multiple parse errors.
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index f19a0a6..918573a 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -122,10 +122,23 @@ // |reader| does not have enough data. bool ReadParameter(quic::QuicDataReader& reader, uint64_t& type, absl::string_view& value); + // Reads MoqtSubscribeParameter from one of the message types that supports + // it. The cursor in |reader| should point to the "number of parameters" + // field in the message. The cursor will move to the end of the parameters. + // Returns false if it could not parse the full message, in which case the + // cursor in |reader| should not be used. + bool ReadSubscribeParameters(quic::QuicDataReader& reader, + MoqtSubscribeParameters& params); // Convert a string view to a varint. Throws an error and returns false if the // string_view is not exactly the right length. bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi); + // Parses a message that a track namespace but not name. The last element of + // |full_track_name| will be set to the empty string. Returns false if it + // could not parse the full namespace field. + bool ReadTrackNamespace(quic::QuicDataReader& reader, + FullTrackName& full_track_name); + MoqtControlParserVisitor& visitor_; bool uses_web_transport_; bool no_more_data_ = false; // Fatal error or fin. No more parsing.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index f69ef96..633f83f 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -677,51 +677,156 @@ TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationInfoTwice) { MoqtControlParser parser(kWebTrans, visitor_); char subscribe[] = { - 0x03, 0x01, 0x02, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x20, 0x02, // priority = 0x20 descending - 0x02, // filter_type = kLatestObject - 0x02, // two params - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x01, 0x02, 0x01, 0x03, + 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, 0x02, // priority = 0x20 descending + 0x02, // filter_type = kLatestObject + 0x02, // two params + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_EQ(visitor_.parsing_error_, - "AUTHORIZATION_INFO parameter appears twice in SUBSCRIBE"); + "AUTHORIZATION_INFO parameter appears twice"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } -TEST_F(MoqtMessageSpecificTest, SubscribeUpdateAuthorizationInfoTwice) { +TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutTwice) { + MoqtControlParser parser(kRawQuic, visitor_); + char subscribe[] = { + 0x03, 0x01, 0x02, 0x01, 0x03, + 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, 0x02, // priority = 0x20 descending + 0x02, // filter_type = kLatestObject + 0x02, // two params + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + }; + parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, + "DELIVERY_TIMEOUT parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutMalformed) { + MoqtControlParser parser(kRawQuic, visitor_); + char subscribe[] = { + 0x03, 0x01, 0x02, 0x01, 0x03, + 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, 0x02, // priority = 0x20 descending + 0x02, // filter_type = kLatestObject + 0x01, // one param + 0x03, 0x01, 0x67, 0x10, // delivery_timeout = 10000 + }; + parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, + "Parameter length does not match varint encoding"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationTwice) { + MoqtControlParser parser(kRawQuic, visitor_); + char subscribe[] = { + 0x03, 0x01, 0x02, 0x01, 0x03, + 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, 0x02, // priority = 0x20 descending + 0x02, // filter_type = kLatestObject + 0x02, // two params + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 + }; + parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, + "MAX_CACHE_DURATION parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationMalformed) { + MoqtControlParser parser(kRawQuic, visitor_); + char subscribe[] = { + 0x03, 0x01, 0x02, 0x01, 0x03, + 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, 0x02, // priority = 0x20 descending + 0x02, // filter_type = kLatestObject + 0x01, // one param + 0x04, 0x01, 0x67, 0x10, // max_cache_duration = 10000 + }; + parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, + "Parameter length does not match varint encoding"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kParameterLengthMismatch); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationInfo) { + MoqtControlParser parser(kWebTrans, visitor_); + char subscribe_ok[] = { + 0x04, 0x01, 0x03, // subscribe_id = 1, expires = 3 + 0x02, 0x01, // group_order = 2, content exists + 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, + 0x02, // 2 parameters + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + }; + parser.ProcessData(absl::string_view(subscribe_ok, sizeof(subscribe_ok)), + false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_OK has authorization info"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, SubscribeUpdateHasAuthorizationInfo) { MoqtControlParser parser(kWebTrans, visitor_); char subscribe_update[] = { 0x02, 0x02, 0x03, 0x01, 0x05, 0x06, // start and end sequences 0xaa, // priority = 0xaa - 0x02, // 2 parameters - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x01, // 1 parameter 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; parser.ProcessData( absl::string_view(subscribe_update, sizeof(subscribe_update)), false); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "AUTHORIZATION_INFO parameter appears twice in SUBSCRIBE_UPDATE"); + EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE_UPDATE has authorization info"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationInfoTwice) { MoqtControlParser parser(kWebTrans, visitor_); char announce[] = { - 0x06, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x02, // 2 params - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x06, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x02, // 2 params + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" }; parser.ProcessData(absl::string_view(announce, sizeof(announce)), false); EXPECT_EQ(visitor_.messages_received_, 0); EXPECT_TRUE(visitor_.parsing_error_.has_value()); EXPECT_EQ(*visitor_.parsing_error_, - "AUTHORIZATION_INFO parameter appears twice in ANNOUNCE"); + "AUTHORIZATION_INFO parameter appears twice"); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + +TEST_F(MoqtMessageSpecificTest, AnnounceHasDeliveryTimeout) { + MoqtControlParser parser(kWebTrans, visitor_); + char announce[] = { + 0x06, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x02, // 2 params + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + }; + parser.ProcessData(absl::string_view(announce, sizeof(announce)), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, "ANNOUNCE has delivery timeout"); EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } @@ -817,7 +922,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x01, // filter_type = kLatestGroup @@ -839,7 +944,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x02, // filter_type = kLatestObject @@ -861,7 +966,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x08, // priority = 0x20 ??? 0x01, // filter_type = kLatestGroup @@ -877,7 +982,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x03, // filter_type = kAbsoluteStart @@ -901,7 +1006,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x04, // filter_type = kAbsoluteStart @@ -927,7 +1032,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x04, // filter_type = kAbsoluteRange @@ -953,7 +1058,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x04, // filter_type = kAbsoluteRange @@ -974,7 +1079,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x04, // filter_type = kAbsoluteRange @@ -1019,7 +1124,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe[] = { 0x03, 0x01, 0x02, // id and alias - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" 0x20, 0x02, // priority = 0x20 descending 0x04, // filter_type = kAbsoluteRange @@ -1040,8 +1145,7 @@ MoqtControlParser parser(kRawQuic, visitor_); char subscribe_update[] = { 0x02, 0x02, 0x03, 0x02, 0x04, 0x01, // start and end sequences - 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0xf0, 0x00, // priority, no parameter }; parser.ProcessData( absl::string_view(subscribe_update, sizeof(subscribe_update)), false); @@ -1055,8 +1159,7 @@ char subscribe_update[] = { 0x02, 0x02, 0x03, 0x02, 0x00, 0x01, // start and end sequences 0x20, // priority - 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x00, // No parameter }; parser.ProcessData( absl::string_view(subscribe_update, sizeof(subscribe_update)), false);
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index a943ba5..723162c 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -225,7 +225,7 @@ // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to // trigger session errors. -void MoqtSession::Announce(absl::string_view track_namespace, +void MoqtSession::Announce(FullTrackName track_namespace, MoqtOutgoingAnnounceCallback announce_callback) { if (peer_role_ == MoqtRole::kPublisher) { std::move(announce_callback)( @@ -505,7 +505,7 @@ auto subscribe_it = active_subscribes_.find(message.subscribe_id); if (subscribe_it == active_subscribes_.end()) { return std::pair<FullTrackName, RemoteTrack::Visitor*>( - {FullTrackName{"", ""}, nullptr}); + {FullTrackName{}, nullptr}); } ActiveSubscribe& subscribe = subscribe_it->second; visitor = subscribe.visitor; @@ -515,7 +515,7 @@ Error(MoqtError::kProtocolViolation, "Forwarding preference changes mid-track"); return std::pair<FullTrackName, RemoteTrack::Visitor*>( - {FullTrackName{"", ""}, nullptr}); + {FullTrackName{}, nullptr}); } } else { subscribe.forwarding_preference = message.forwarding_preference; @@ -528,7 +528,7 @@ Error(MoqtError::kProtocolViolation, "Forwarding preference changes mid-track"); return std::pair<FullTrackName, RemoteTrack::Visitor*>( - {FullTrackName{"", ""}, nullptr}); + {FullTrackName{}, nullptr}); } return std::make_pair(track.full_track_name(), track.visitor()); } @@ -669,7 +669,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name << " rejected by the application: " << track_publisher.status(); - SendSubscribeError(message, SubscribeErrorCode::kInternalError, + SendSubscribeError(message, SubscribeErrorCode::kTrackDoesNotExist, track_publisher.status().message(), message.track_alias); return; }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index b3124a9..ecfe8d0 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -43,14 +43,14 @@ using MoqtSessionDeletedCallback = quiche::SingleUseCallback<void()>; // If |error_message| is nullopt, the ANNOUNCE was successful. using MoqtOutgoingAnnounceCallback = quiche::SingleUseCallback<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error)>; using MoqtIncomingAnnounceCallback = quiche::MultiUseCallback<std::optional<MoqtAnnounceErrorReason>( - absl::string_view track_namespace)>; + FullTrackName track_namespace)>; inline std::optional<MoqtAnnounceErrorReason> DefaultIncomingAnnounceCallback( - absl::string_view /*track_namespace*/) { + FullTrackName /*track_namespace*/) { return std::optional(MoqtAnnounceErrorReason{ MoqtAnnounceErrorCode::kAnnounceNotSupported, "This endpoint does not accept incoming ANNOUNCE messages"}); @@ -108,7 +108,7 @@ // Send an ANNOUNCE message for |track_namespace|, and call // |announce_callback| when the response arrives. Will fail immediately if // there is already an unresolved ANNOUNCE for that namespace. - void Announce(absl::string_view track_namespace, + void Announce(FullTrackName track_namespace, MoqtOutgoingAnnounceCallback announce_callback); // Returns true if SUBSCRIBE was sent. If there is already a subscription to @@ -437,7 +437,7 @@ FullSequence first_object); // Get FullTrackName and visitor for a subscribe_id and track_alias. Returns - // nullptr if not present. + // an empty FullTrackName tuple and nullptr if not present. std::pair<FullTrackName, RemoteTrack::Visitor*> TrackPropertiesFromAlias( const MoqtObject& message); @@ -511,7 +511,7 @@ monitoring_interfaces_for_published_tracks_; // Indexed by track namespace. - absl::flat_hash_map<std::string, MoqtOutgoingAnnounceCallback> + absl::flat_hash_map<FullTrackName, MoqtOutgoingAnnounceCallback> pending_outgoing_announces_; // The role the peer advertised in its SETUP message. Initialize it to avoid
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index f1d572f..994e85a 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -358,7 +358,7 @@ TEST_F(MoqtSessionTest, AnnounceWithOk) { testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_resolved_callback; webtransport::test::MockStream mock_stream; @@ -373,18 +373,19 @@ EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kAnnounce); return absl::OkStatus(); }); - session_.Announce("foo", announce_resolved_callback.AsStdFunction()); + session_.Announce(FullTrackName{"foo"}, + announce_resolved_callback.AsStdFunction()); EXPECT_TRUE(correct_message); MoqtAnnounceOk ok = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, }; correct_message = false; EXPECT_CALL(announce_resolved_callback, Call(_, _)) - .WillOnce([&](absl::string_view track_namespace, + .WillOnce([&](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error) { correct_message = true; - EXPECT_EQ(track_namespace, "foo"); + EXPECT_EQ(track_namespace, FullTrackName{"foo"}); EXPECT_FALSE(error.has_value()); }); stream_input->OnAnnounceOkMessage(ok); @@ -393,7 +394,7 @@ TEST_F(MoqtSessionTest, AnnounceWithError) { testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_resolved_callback; webtransport::test::MockStream mock_stream; @@ -408,20 +409,21 @@ EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kAnnounce); return absl::OkStatus(); }); - session_.Announce("foo", announce_resolved_callback.AsStdFunction()); + session_.Announce(FullTrackName{"foo"}, + announce_resolved_callback.AsStdFunction()); EXPECT_TRUE(correct_message); MoqtAnnounceError error = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, /*error_code=*/MoqtAnnounceErrorCode::kInternalError, /*reason_phrase=*/"Test error", }; correct_message = false; EXPECT_CALL(announce_resolved_callback, Call(_, _)) - .WillOnce([&](absl::string_view track_namespace, + .WillOnce([&](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error) { correct_message = true; - EXPECT_EQ(track_namespace, "foo"); + EXPECT_EQ(track_namespace, FullTrackName{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, MoqtAnnounceErrorCode::kInternalError); EXPECT_EQ(error->reason_phrase, "Test error"); @@ -686,10 +688,11 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream); MoqtAnnounce announce = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, }; bool correct_message = false; - EXPECT_CALL(session_callbacks_.incoming_announce_callback, Call("foo")) + EXPECT_CALL(session_callbacks_.incoming_announce_callback, + Call(FullTrackName{"foo"})) .WillOnce(Return(std::nullopt)); EXPECT_CALL(mock_stream, Writev(_, _)) .WillOnce([&](absl::Span<const absl::string_view> data, @@ -1355,11 +1358,12 @@ TEST_F(MoqtSessionTest, AnnounceToPublisher) { MoqtSessionPeer::set_peer_role(&session_, MoqtRole::kPublisher); testing::MockFunction<void( - absl::string_view track_namespace, + FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> error_message)> announce_resolved_callback; EXPECT_CALL(announce_resolved_callback, Call(_, _)).Times(1); - session_.Announce("foo", announce_resolved_callback.AsStdFunction()); + session_.Announce(FullTrackName{"foo"}, + announce_resolved_callback.AsStdFunction()); } TEST_F(MoqtSessionTest, SubscribeFromPublisher) { @@ -1394,7 +1398,7 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream); MoqtAnnounce announce = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, }; EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 1be7b08..7e4fafe 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -439,16 +439,15 @@ QUIC_LOG(INFO) << "SUBSCRIBE end object mismatch"; return false; } - if (cast.parameters.authorization_info != - subscribe_.parameters.authorization_info) { - QUIC_LOG(INFO) << "SUBSCRIBE authorization info mismatch"; + if (cast.parameters != subscribe_.parameters) { + QUIC_LOG(INFO) << "SUBSCRIBE parameter mismatch"; return false; } return true; } void ExpandVarints() override { - ExpandVarintsImpl("vvvv---v------vvvvvv---"); + ExpandVarintsImpl("vvvvv---v------vvvvvv---vv--vv--"); } MessageStructuredData structured_data() const override { @@ -456,22 +455,20 @@ } private: - uint8_t raw_packet_[23] = { - 0x03, 0x01, - 0x02, // id and alias - 0x03, 0x66, 0x6f, - 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, - 0x64, // track_name = "abcd" - 0x20, // subscriber priority = 0x20 - 0x02, // group order = descending - 0x03, // Filter type: Absolute Start - 0x04, // start_group = 4 (relative previous) - 0x01, // start_object = 1 (absolute) + uint8_t raw_packet_[32] = { + 0x03, 0x01, 0x02, // id and alias + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x20, // subscriber priority = 0x20 + 0x02, // group order = descending + 0x03, // Filter type: Absolute Start + 0x04, // start_group = 4 (relative previous) + 0x01, // start_object = 1 (absolute) // No EndGroup or EndObject - 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, - 0x72, // authorization_info = "bar" + 0x03, // 3 parameters + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 ms }; MoqtSubscribe subscribe_ = { @@ -484,7 +481,10 @@ /*start_object=*/1, /*end_group=*/std::nullopt, /*end_object=*/std::nullopt, - /*authorization_info=*/MoqtSubscribeParameters{"bar", std::nullopt}, + /*parameters=*/ + MoqtSubscribeParameters{ + "bar", quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, }; }; @@ -512,10 +512,14 @@ QUIC_LOG(INFO) << "SUBSCRIBE OK largest ID mismatch"; return false; } + if (cast.parameters != subscribe_ok_.parameters) { + QUIC_LOG(INFO) << "SUBSCRIBE OK parameter mismatch"; + return false; + } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvv--vv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv--vvvvv--vv--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(subscribe_ok_); @@ -532,10 +536,13 @@ } private: - uint8_t raw_packet_[7] = { - 0x04, 0x01, 0x03, // subscribe_id = 1, expires = 3 - 0x02, // delivery_order = 2, - 0x01, 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, + uint8_t raw_packet_[16] = { + 0x04, 0x01, 0x03, // subscribe_id = 1, expires = 3 + 0x02, 0x01, // group_order = 2, content exists + 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, + 0x02, // 2 parameters + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 }; MoqtSubscribeOk subscribe_ok_ = { @@ -543,6 +550,10 @@ /*expires=*/quic::QuicTimeDelta::FromMilliseconds(3), /*group_order=*/MoqtDeliveryOrder::kDescending, /*largest_id=*/FullSequence(12, 20), + /*parameters=*/ + MoqtSubscribeParameters{ + std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, }; }; @@ -711,8 +722,8 @@ QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscriber priority mismatch"; return false; } - if (cast.authorization_info != subscribe_update_.authorization_info) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE authorization info mismatch"; + if (cast.parameters != subscribe_update_.parameters) { + QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE parameter mismatch"; return false; } return true; @@ -725,11 +736,12 @@ } private: - uint8_t raw_packet_[13] = { + uint8_t raw_packet_[16] = { 0x02, 0x02, 0x03, 0x01, 0x05, 0x06, // start and end sequences 0xaa, // subscriber_priority - 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x02, // 1 parameter + 0x03, 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000 }; MoqtSubscribeUpdate subscribe_update_ = { @@ -739,7 +751,10 @@ /*end_group=*/4, /*end_object=*/5, /*subscriber_priority=*/0xaa, - /*authorization_info=*/"bar", + /*parameters=*/ + MoqtSubscribeParameters{ + std::nullopt, quic::QuicTimeDelta::FromMilliseconds(10000), + quic::QuicTimeDelta::FromMilliseconds(10000), std::nullopt}, }; }; @@ -755,29 +770,33 @@ QUIC_LOG(INFO) << "ANNOUNCE MESSAGE track namespace mismatch"; return false; } - if (cast.authorization_info != announce_.authorization_info) { + if (cast.parameters != announce_.parameters) { QUIC_LOG(INFO) << "ANNOUNCE MESSAGE authorization info mismatch"; return false; } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---vvv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---vvv---vv--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(announce_); } private: - uint8_t raw_packet_[11] = { - 0x06, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x01, // 1 parameter - 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + uint8_t raw_packet_[16] = { + 0x06, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x02, // 2 parameters + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + 0x04, 0x02, 0x67, 0x10, // max_cache_duration = 10000ms }; MoqtAnnounce announce_ = { - /*track_namespace=*/"foo", - /*authorization_info=*/"bar", + /*track_namespace=*/FullTrackName{"foo"}, + /*parameters=*/ + MoqtSubscribeParameters{"bar", std::nullopt, + quic::QuicTimeDelta::FromMilliseconds(10000), + std::nullopt}, }; }; @@ -796,19 +815,19 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(announce_ok_); } private: - uint8_t raw_packet_[5] = { - 0x07, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + uint8_t raw_packet_[6] = { + 0x07, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" }; MoqtAnnounceOk announce_ok_ = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, }; }; @@ -835,21 +854,21 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---vv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(announce_error_); } private: - uint8_t raw_packet_[10] = { - 0x08, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x01, // error_code = 1 - 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" + uint8_t raw_packet_[11] = { + 0x08, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, // error_code = 1 + 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" }; MoqtAnnounceError announce_error_ = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, /*error_code=*/MoqtAnnounceErrorCode::kAnnounceNotSupported, /*reason_phrase=*/"bar", }; @@ -867,22 +886,34 @@ QUIC_LOG(INFO) << "ANNOUNCE CANCEL track namespace mismatch"; return false; } + if (cast.error_code != announce_cancel_.error_code) { + QUIC_LOG(INFO) << "ANNOUNCE CANCEL error code mismatch"; + return false; + } + if (cast.reason_phrase != announce_cancel_.reason_phrase) { + QUIC_LOG(INFO) << "ANNOUNCE CANCEL reason phrase mismatch"; + return false; + } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---vv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(announce_cancel_); } private: - uint8_t raw_packet_[5] = { - 0x0c, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + uint8_t raw_packet_[11] = { + 0x0c, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, // error_code = 1 + 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" }; MoqtAnnounceCancel announce_cancel_ = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, + /*error_code=*/1, + /*reason_phrase=*/"bar", }; }; @@ -901,16 +932,16 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---v----"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---v----"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(track_status_request_); } private: - uint8_t raw_packet_[10] = { - 0x0d, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + uint8_t raw_packet_[11] = { + 0x0d, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" }; MoqtTrackStatusRequest track_status_request_ = { @@ -933,19 +964,19 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(unannounce_); } private: - uint8_t raw_packet_[5] = { - 0x09, 0x03, 0x66, 0x6f, 0x6f, // track_namespace + uint8_t raw_packet_[6] = { + 0x09, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace }; MoqtUnannounce unannounce_ = { - /*track_namespace=*/"foo", + /*track_namespace=*/FullTrackName{"foo"}, }; }; @@ -976,17 +1007,17 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---v----vvv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---v----vvv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(track_status_); } private: - uint8_t raw_packet_[13] = { - 0x0e, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" - 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" - 0x00, 0x0c, 0x14, // status, last_group, last_object + uint8_t raw_packet_[14] = { + 0x0e, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x04, 0x61, 0x62, 0x63, 0x64, // track_name = "abcd" + 0x00, 0x0c, 0x14, // status, last_group, last_object }; MoqtTrackStatus track_status_ = {
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index 9e43485..b706469 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -114,8 +114,7 @@ if (full_track_name == client_->chat_strings_->GetCatalogName()) { std::cout << "Subscription to catalog "; } else { - std::cout << "Subscription to user " << full_track_name.track_namespace() - << " "; + std::cout << "Subscription to user " << full_track_name.ToString() << " "; } if (reason_phrase.has_value()) { std::cout << "REJECTED, reason = " << *reason_phrase << "\n"; @@ -142,8 +141,8 @@ client_->ProcessCatalog(object, this, group_sequence, object_sequence); return; } - std::string username(full_track_name.track_namespace()); - username = username.substr(username.find_last_of('/') + 1); + std::string username( + client_->chat_strings_->GetUsernameFromFullTrackName(full_track_name)); if (!client_->other_users_.contains(username)) { std::cout << "Username " << username << "doesn't exist\n"; return; @@ -170,7 +169,7 @@ publisher_.Add(queue_); session_->set_publisher(&publisher_); MoqtOutgoingAnnounceCallback announce_callback = - [this](absl::string_view track_namespace, + [this](FullTrackName track_namespace, std::optional<MoqtAnnounceErrorReason> reason) { if (reason.has_value()) { std::cout << "ANNOUNCE rejected, " << reason->reason_phrase << "\n"; @@ -178,18 +177,21 @@ "Local ANNOUNCE rejected"); return; } - std::cout << "ANNOUNCE for " << track_namespace << " accepted\n"; + std::cout << "ANNOUNCE for " << track_namespace.ToString() + << " accepted\n"; return; }; - std::cout << "Announcing " << my_track_name.track_namespace() << "\n"; - session_->Announce(my_track_name.track_namespace(), - std::move(announce_callback)); + FullTrackName my_track_namespace = my_track_name; + my_track_namespace.NameToNamespace(); + std::cout << "Announcing " << my_track_namespace.ToString() << "\n"; + session_->Announce(my_track_namespace, std::move(announce_callback)); } remote_track_visitor_ = std::make_unique<RemoteTrackVisitor>(this); FullTrackName catalog_name = chat_strings_->GetCatalogName(); if (!session_->SubscribeCurrentGroup( catalog_name, remote_track_visitor_.get(), - MoqtSubscribeParameters{username_, std::nullopt})) { + MoqtSubscribeParameters{username_, std::nullopt, std::nullopt, + std::nullopt})) { std::cout << "Failed to get catalog\n"; return false; }
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc index c7c6ce5..29d0842 100644 --- a/quiche/quic/moqt/tools/chat_server.cc +++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -33,15 +33,17 @@ MoqtSession* session, ChatServer* server) : session_(session), server_(server) { session_->callbacks().incoming_announce_callback = - [&](absl::string_view track_namespace) { - std::cout << "Received ANNOUNCE for " << track_namespace << "\n"; - username_ = - server_->strings().GetUsernameFromTrackNamespace(track_namespace); + [&](FullTrackName track_namespace) { + FullTrackName track_name = track_namespace; + track_name.AddElement(""); + std::cout << "Received ANNOUNCE for " << track_namespace.ToString() + << "\n"; + username_ = server_->strings().GetUsernameFromFullTrackName(track_name); if (username_->empty()) { std::cout << "Malformed ANNOUNCE namespace\n"; return std::nullopt; } - session_->SubscribeCurrentGroup(FullTrackName({track_namespace, ""}), + session_->SubscribeCurrentGroup(track_name, server_->remote_track_visitor()); server_->AddUser(*username_); return std::nullopt; @@ -69,7 +71,8 @@ void ChatServer::RemoteTrackVisitor::OnReply( const moqt::FullTrackName& full_track_name, std::optional<absl::string_view> reason_phrase) { - std::cout << "Subscription to user " << full_track_name.track_namespace() + std::cout << "Subscription to user " + << server_->strings().GetUsernameFromFullTrackName(full_track_name) << " "; if (reason_phrase.has_value()) { std::cout << "REJECTED, reason = " << *reason_phrase << "\n";
diff --git a/quiche/quic/moqt/tools/moq_chat.h b/quiche/quic/moqt/tools/moq_chat.h index 1e6ac66..6789f82 100644 --- a/quiche/quic/moqt/tools/moq_chat.h +++ b/quiche/quic/moqt/tools/moq_chat.h
@@ -32,10 +32,16 @@ } // Returns "" if the track namespace is not a participant track. - std::string GetUsernameFromTrackNamespace( - absl::string_view track_namespace) const { + std::string GetUsernameFromFullTrackName( + FullTrackName full_track_name) const { + if (full_track_name.tuple().size() != 2) { + return ""; + } + if (!full_track_name.tuple()[1].empty()) { + return ""; + } std::vector<absl::string_view> elements = - absl::StrSplit(track_namespace, '/'); + absl::StrSplit(full_track_name.tuple()[0], '/'); if (elements.size() != 4 || elements[0] != kBasePath || elements[1] != chat_id_ || elements[2] != kParticipantPath) { return ""; @@ -43,16 +49,6 @@ return std::string(elements[3]); } - // Returns "" if the full track name is not a participant track. - std::string GetUsernameFromFullTrackName( - FullTrackName full_track_name) const { - // Check the full path - if (!full_track_name.track_name().empty()) { - return ""; - } - return GetUsernameFromTrackNamespace(full_track_name.track_namespace()); - } - FullTrackName GetFullTrackNameFromUsername(absl::string_view username) const { return FullTrackName{absl::StrCat(kBasePath, "/", chat_id_, "/", kParticipantPath, "/", username),
diff --git a/quiche/quic/moqt/tools/moq_chat_test.cc b/quiche/quic/moqt/tools/moq_chat_test.cc index 1c93d17..f2c8d09 100644 --- a/quiche/quic/moqt/tools/moq_chat_test.cc +++ b/quiche/quic/moqt/tools/moq_chat_test.cc
@@ -23,38 +23,42 @@ EXPECT_FALSE(strings_.IsValidPath("/moq-chat/")); } -TEST_F(MoqChatStringsTest, GetUsernameFromTrackNamespace) { - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-chat/chat-id/participant/user"), - "user"); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "/moq-chat/chat-id/participant/user"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-chat/chat-id/participant/user/"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-cha/chat-id/participant/user"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-chat/chat-i/participant/user"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-chat/chat-id/participan/user"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace("moq-chat/chat-id/user"), - ""); - EXPECT_EQ(strings_.GetUsernameFromTrackNamespace( - "moq-chat/chat-id/participant/foo/user"), - ""); -} - TEST_F(MoqChatStringsTest, GetUsernameFromFullTrackName) { EXPECT_EQ(strings_.GetUsernameFromFullTrackName( - FullTrackName("moq-chat/chat-id/participant/user", "")), + FullTrackName{"moq-chat/chat-id/participant/user", ""}), "user"); +} + +TEST_F(MoqChatStringsTest, GetUsernameFromFullTrackNameInvalidInput) { EXPECT_EQ(strings_.GetUsernameFromFullTrackName( - FullTrackName("moq-chat/chat-id/participant/user", "foo")), + FullTrackName{"/moq-chat/chat-id/participant/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/participant/user/", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-cha/chat-id/participant/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-i/participant/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/participan/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/participant/foo/user", ""}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/participant/user", "foo"}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"moq-chat/chat-id/participant/user"}), + ""); + EXPECT_EQ(strings_.GetUsernameFromFullTrackName( + FullTrackName{"foo", "moq-chat/chat-id/participant/user", ""}), ""); }
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index 4e4ac10..bd0f72f 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -82,16 +82,25 @@ return absl::ascii_isalnum(c) || c == '-' || c == '_'; } -bool IsValidTrackNamespace(absl::string_view track_namespace) { - return absl::c_all_of(track_namespace, IsValidTrackNamespaceChar); +bool IsValidTrackNamespace(FullTrackName track_namespace) { + for (const auto& element : track_namespace.tuple()) { + if (!absl::c_all_of(element, IsValidTrackNamespaceChar)) { + return false; + } + } + return true; } -std::string CleanUpTrackNamespace(absl::string_view track_namespace) { - std::string output(track_namespace); - for (char& c : output) { - if (!IsValidTrackNamespaceChar(c)) { - c = '_'; +FullTrackName CleanUpTrackNamespace(FullTrackName track_namespace) { + FullTrackName output; + for (auto& it : track_namespace.tuple()) { + std::string element = it; + for (char& c : element) { + if (!IsValidTrackNamespaceChar(c)) { + c = '_'; + } } + output.AddElement(element); } return output; } @@ -107,7 +116,7 @@ } std::optional<MoqtAnnounceErrorReason> OnAnnounceReceived( - absl::string_view track_namespace) { + FullTrackName track_namespace) { if (!IsValidTrackNamespace(track_namespace) && !quiche::GetQuicheCommandLineFlag( FLAGS_allow_invalid_track_namespaces)) { @@ -142,8 +151,9 @@ std::vector<absl::string_view> tracks_to_subscribe = absl::StrSplit(track_list, ',', absl::AllowEmpty()); for (absl::string_view track : tracks_to_subscribe) { - session_->SubscribeCurrentGroup(FullTrackName({track_namespace, track}), - &it->second); + FullTrackName full_track_name = track_namespace; + full_track_name.AddElement(track); + session_->SubscribeCurrentGroup(full_track_name, &it->second); } return std::nullopt; @@ -174,7 +184,7 @@ absl::string_view object, bool /*end_of_message*/) override { std::string file_name = absl::StrCat(group_sequence, "-", object_sequence, - ".", full_track_name.track_name()); + ".", full_track_name.tuple().back()); std::string file_path = quiche::JoinPath(directory_, file_name); std::ofstream output(file_path, std::ios::binary | std::ios::ate); output.write(object.data(), object.size()); @@ -187,7 +197,7 @@ MoqtSession* session_; // Not owned. std::string output_root_; - absl::node_hash_map<std::string, NamespaceHandler> subscribed_namespaces_; + absl::node_hash_map<FullTrackName, NamespaceHandler> subscribed_namespaces_; }; absl::StatusOr<MoqtConfigureSessionCallback> IncomingSessionHandler(
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index 1e4aebf..371a0c9 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -26,8 +26,7 @@ testing::MockFunction<void()> session_established_callback; testing::MockFunction<void(absl::string_view)> session_terminated_callback; testing::MockFunction<void()> session_deleted_callback; - testing::MockFunction<std::optional<MoqtAnnounceErrorReason>( - absl::string_view)> + testing::MockFunction<std::optional<MoqtAnnounceErrorReason>(FullTrackName)> incoming_announce_callback; MockSessionCallbacks() {