Update PUBLISH_NAMESPACE to MOQT draft-16. PiperOrigin-RevId: 868705687
diff --git a/quiche/quic/moqt/moqt_error.h b/quiche/quic/moqt/moqt_error.h index a9d6d86..085af18 100644 --- a/quiche/quic/moqt/moqt_error.h +++ b/quiche/quic/moqt/moqt_error.h
@@ -71,6 +71,7 @@ kMalformedTrack = 0x9, kMalformedAuthToken = 0x10, kExpiredAuthToken = 0x12, + kDuplicateSubscription = 0x19, kPrefixOverlap = 0x30, }; @@ -78,6 +79,7 @@ RequestErrorCode error_code; std::optional<quic::QuicTimeDelta> retry_interval; std::string reason_phrase; + bool operator==(const MoqtRequestErrorInfo& other) const = default; }; RequestErrorCode StatusToRequestErrorCode(absl::Status status);
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 59aa538..8bb83a8 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -573,21 +573,16 @@ quiche::QuicheBuffer MoqtFramer::SerializePublishNamespace( const MoqtPublishNamespace& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kPublishNamespace, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; - return SerializeControlMessage(MoqtMessageType::kPublishNamespace, - WireVarInt62(message.request_id), - WireTrackNamespace(message.track_namespace), - WireKeyValuePairList(parameters)); + return SerializeControlMessage( + MoqtMessageType::kPublishNamespace, WireVarInt62(message.request_id), + WireTrackNamespace(message.track_namespace), + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceDone( const MoqtPublishNamespaceDone& message) { return SerializeControlMessage(MoqtMessageType::kPublishNamespaceDone, - WireTrackNamespace(message.track_namespace)); + WireVarInt62(message.request_id)); } quiche::QuicheBuffer MoqtFramer::SerializeNamespace( @@ -608,8 +603,7 @@ const MoqtPublishNamespaceCancel& message) { return SerializeControlMessage( MoqtMessageType::kPublishNamespaceCancel, - WireTrackNamespace(message.track_namespace), - WireVarInt62(message.error_code), + WireVarInt62(message.request_id), WireVarInt62(message.error_code), WireStringWithVarInt62Length(message.error_reason)); }
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 5fc8956..1262b49 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -18,13 +18,13 @@ #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_known_track_publisher.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" -#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_probe_manager.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_session.h" @@ -172,86 +172,73 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenPublishNamespaceDone) { EstablishSession(); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, parameters, _)) + Call(TrackNamespace{"foo"}, std::make_optional(parameters), _)) .WillOnce([](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> - publish_namespace_callback; - client_->session()->PublishNamespace( - TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), - *parameters); + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)> + response_callback; + client_->session()->PublishNamespace(TrackNamespace{"foo"}, parameters, + response_callback.AsStdFunction(), + [](MoqtRequestErrorInfo) {}); bool matches = false; - EXPECT_CALL(publish_namespace_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { + EXPECT_CALL(response_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { matches = true; - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); }); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); matches = false; - EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, - std::optional<VersionSpecificParameters>(), _)) - .WillOnce([&](const TrackNamespace& name, - const std::optional<VersionSpecificParameters>& parameters, - MoqtResponseCallback callback) { - matches = true; - EXPECT_EQ(callback, nullptr); - }); - client_->session()->PublishNamespaceDone(TrackNamespace{"foo"}); + EXPECT_CALL( + server_callbacks_.incoming_publish_namespace_callback, + Call(TrackNamespace{"foo"}, std::optional<MessageParameters>(), nullptr)) + .WillOnce([&](const TrackNamespace&, + const std::optional<MessageParameters>&, + MoqtResponseCallback) { matches = true; }); + EXPECT_TRUE(client_->session()->PublishNamespaceDone(TrackNamespace{"foo"})); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); } TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessThenCancel) { EstablishSession(); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, parameters, _)) + Call(TrackNamespace{"foo"}, std::make_optional(parameters), _)) .WillOnce([](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> - publish_namespace_callback; - client_->session()->PublishNamespace( - TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), - *parameters); + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)> + response_callback; + testing::MockFunction<void(MoqtRequestErrorInfo)> cancel_callback; + client_->session()->PublishNamespace(TrackNamespace{"foo"}, parameters, + response_callback.AsStdFunction(), + cancel_callback.AsStdFunction()); bool matches = false; - EXPECT_CALL(publish_namespace_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - matches = true; - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); - EXPECT_FALSE(error.has_value()); - }); + EXPECT_CALL(response_callback, Call(std::optional<MoqtRequestErrorInfo>())) + .WillOnce( + [&](std::optional<MoqtRequestErrorInfo> error) { matches = true; }); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); EXPECT_TRUE(success); matches = false; - EXPECT_CALL(publish_namespace_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - matches = true; - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); - EXPECT_EQ(error->reason_phrase, "internal error"); - }); - server_->session()->CancelPublishNamespace(TrackNamespace{"foo"}, + EXPECT_CALL(cancel_callback, + Call(MoqtRequestErrorInfo{RequestErrorCode::kInternalError, + std::nullopt, "internal error"})) + .WillOnce([&](std::optional<MoqtRequestErrorInfo>) { matches = true; }); + server_->session()->PublishNamespaceCancel(TrackNamespace{"foo"}, RequestErrorCode::kInternalError, "internal error"); success = test_harness_.RunUntilWithDefaultTimeout([&]() { return matches; }); @@ -260,33 +247,30 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSubscribeInResponse) { EstablishSession(); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + TrackNamespace prefix{"foo"}; + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, parameters, _)) - .WillOnce([](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, - MoqtResponseCallback callback) { + Call(TrackNamespace{"foo"}, std::make_optional(parameters), _)) + .WillOnce([&](const TrackNamespace& track_namespace, + const std::optional<MessageParameters>&, + MoqtResponseCallback callback) { std::move(callback)(std::nullopt); - }); - client_->session()->PublishNamespace( - TrackNamespace{"foo"}, - outgoing_publish_namespace_callback_.AsStdFunction(), *parameters); - bool matches = false; - EXPECT_CALL(outgoing_publish_namespace_callback_, Call) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); FullTrackName track_name(track_namespace, "/catalog"); - EXPECT_FALSE(error.has_value()); MessageParameters parameters(MoqtFilterType::kLargestObject); server_->session()->Subscribe(track_name, &subscribe_visitor_, parameters); - }) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); - EXPECT_TRUE(error.has_value()); + }); + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)> + response_callback; + client_->session()->PublishNamespace(prefix, parameters, + response_callback.AsStdFunction(), + [](MoqtRequestErrorInfo) {}); + bool matches = false; + EXPECT_CALL(response_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { + EXPECT_FALSE(error.has_value()); }); EXPECT_CALL(subscribe_visitor_, OnReply).WillOnce([&]() { matches = true; }); bool success = @@ -294,12 +278,7 @@ EXPECT_TRUE(success); // Session tears down PUBLISH_NAMESPACE. EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, - std::optional<VersionSpecificParameters>(), _)) - .WillOnce( - [](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, - MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); + Call(prefix, std::optional<MessageParameters>(), nullptr)); } TEST_F(MoqtIntegrationTest, PublishNamespaceSuccessSendDataInResponse) { @@ -307,12 +286,13 @@ // Set up the server to subscribe to "data" track for the namespace // publish_namespace it receives. - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"test"}, parameters, _)) + Call(TrackNamespace{"test"}, std::make_optional(parameters), _)) .WillOnce([&](const TrackNamespace& track_namespace, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { FullTrackName track_name(track_namespace, "data"); std::move(callback)(std::nullopt); @@ -331,8 +311,8 @@ received_subscribe_ok = true; }); client_->session()->PublishNamespace( - TrackNamespace{"test"}, - [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, *parameters); + TrackNamespace{"test"}, parameters, + [](std::optional<MoqtRequestErrorInfo>) {}, [](MoqtRequestErrorInfo) {}); bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received_subscribe_ok; }); EXPECT_TRUE(success); @@ -357,12 +337,8 @@ EXPECT_TRUE(success); // Session tears down PUBLISH_NAMESPACE. EXPECT_CALL(server_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"test"}, - std::optional<VersionSpecificParameters>(), _)) - .WillOnce( - [](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, - MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); + Call(TrackNamespace{"test"}, std::optional<MessageParameters>(), + nullptr)); } TEST_F(MoqtIntegrationTest, SendMultipleGroups) { @@ -531,18 +507,16 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) { EstablishSession(); - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> - publish_namespace_callback; - client_->session()->PublishNamespace( - TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), - VersionSpecificParameters()); + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo>)> + response_callback; + client_->session()->PublishNamespace(TrackNamespace{"foo"}, + MessageParameters(), + response_callback.AsStdFunction(), + [](MoqtRequestErrorInfo error_info) {}); bool matches = false; - EXPECT_CALL(publish_namespace_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { + EXPECT_CALL(response_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { matches = true; - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kNotSupported); });
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 4f99112..0d8580f 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -401,7 +401,7 @@ struct QUICHE_EXPORT MoqtPublishNamespace { uint64_t request_id; TrackNamespace track_namespace; - VersionSpecificParameters parameters; + MessageParameters parameters; }; struct QUICHE_EXPORT MoqtRequestOk { @@ -410,11 +410,11 @@ }; struct QUICHE_EXPORT MoqtPublishNamespaceDone { - TrackNamespace track_namespace; + uint64_t request_id; }; struct QUICHE_EXPORT MoqtPublishNamespaceCancel { - TrackNamespace track_namespace; + uint64_t request_id; RequestErrorCode error_code; std::string error_reason; };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 0aeb42c..174c153 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -789,13 +789,7 @@ !ReadTrackNamespace(reader, publish_namespace.track_namespace)) { return 0; } - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (!FillAndValidateVersionSpecificParameters( - parameters, publish_namespace.parameters, - MoqtMessageType::kPublishNamespace)) { + if (!FillAndValidateMessageParameters(reader, publish_namespace.parameters)) { return 0; } visitor_.OnPublishNamespaceMessage(publish_namespace); @@ -834,22 +828,20 @@ size_t MoqtControlParser::ProcessPublishNamespaceDone( quic::QuicDataReader& reader) { - MoqtPublishNamespaceDone unpublish_namespace; - if (!ReadTrackNamespace(reader, unpublish_namespace.track_namespace)) { + MoqtPublishNamespaceDone pn_done; + if (!reader.ReadVarInt62(&pn_done.request_id)) { return 0; } - visitor_.OnPublishNamespaceDoneMessage(unpublish_namespace); + visitor_.OnPublishNamespaceDoneMessage(pn_done); return reader.PreviouslyReadPayload().length(); } size_t MoqtControlParser::ProcessPublishNamespaceCancel( quic::QuicDataReader& reader) { MoqtPublishNamespaceCancel publish_namespace_cancel; - if (!ReadTrackNamespace(reader, publish_namespace_cancel.track_namespace)) { - return 0; - } uint64_t error_code; - if (!reader.ReadVarInt62(&error_code) || + if (!reader.ReadVarInt62(&publish_namespace_cancel.request_id) || + !reader.ReadVarInt62(&error_code) || !reader.ReadStringVarInt62(publish_namespace_cancel.error_reason)) { return 0; }
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index a2049c8..0fed64d 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -831,25 +831,6 @@ EXPECT_EQ(visitor_.messages_received_, 1); } -TEST_F(MoqtMessageSpecificTest, PublishNamespaceHasDeliveryTimeout) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kWebTrans, &stream, visitor_); - char publish_namespace[] = { - 0x06, 0x00, 0x11, 0x02, 0x01, 0x03, 0x66, - 0x6f, 0x6f, // track_namespace = "foo" - 0x02, // 2 params - 0x02, 0x67, 0x10, // delivery_timeout = 10000 - 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_info = "bar" - }; - stream.Receive( - absl::string_view(publish_namespace, sizeof(publish_namespace)), false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, - "Version Specific Parameter not allowed for this message type"); - EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); -} - TEST_F(MoqtMessageSpecificTest, FinMidPayload) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtDataParser parser(&stream, &visitor_);
diff --git a/quiche/quic/moqt/moqt_relay_publisher.cc b/quiche/quic/moqt/moqt_relay_publisher.cc index 478341c..5f10c25 100644 --- a/quiche/quic/moqt/moqt_relay_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_publisher.cc
@@ -11,6 +11,8 @@ #include "absl/base/nullability.h" #include "absl/container/flat_hash_map.h" #include "absl/strings/string_view.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_relay_track_publisher.h" @@ -65,8 +67,7 @@ void MoqtRelayPublisher::OnPublishNamespace( const TrackNamespace& track_namespace, - const VersionSpecificParameters& /*parameters*/, - MoqtSessionInterface* session, + const MessageParameters& /*parameters*/, MoqtSessionInterface* session, MoqtResponseCallback absl_nullable callback) { if (session == nullptr) { return;
diff --git a/quiche/quic/moqt/moqt_relay_publisher.h b/quiche/quic/moqt/moqt_relay_publisher.h index 37b1fcf..ee1fb71 100644 --- a/quiche/quic/moqt/moqt_relay_publisher.h +++ b/quiche/quic/moqt/moqt_relay_publisher.h
@@ -53,7 +53,7 @@ } void OnPublishNamespace(const TrackNamespace& track_namespace, - const VersionSpecificParameters& parameters, + const MessageParameters& parameters, MoqtSessionInterface* session, MoqtResponseCallback absl_nullable callback);
diff --git a/quiche/quic/moqt/moqt_relay_publisher_test.cc b/quiche/quic/moqt/moqt_relay_publisher_test.cc index 6cb6b94..adecafc 100644 --- a/quiche/quic/moqt/moqt_relay_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_publisher_test.cc
@@ -79,7 +79,7 @@ EXPECT_EQ(publisher_.GetTrack(FullTrackName("foo", "bar")), nullptr); std::optional<MoqtRequestErrorInfo> response; publisher_.OnPublishNamespace( - TrackNamespace({"foo"}), VersionSpecificParameters(), &session_, + TrackNamespace({"foo"}), MessageParameters(), &session_, [&](std::optional<MoqtRequestErrorInfo> error_response) { response = error_response; });
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 05dafcf..06800ed 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -336,21 +336,16 @@ return state_ptr->CreateTask(prefix); } -void MoqtSession::PublishNamespace( - TrackNamespace track_namespace, - MoqtOutgoingPublishNamespaceCallback callback, - VersionSpecificParameters parameters) { +bool MoqtSession::PublishNamespace( + const TrackNamespace& track_namespace, const MessageParameters& parameters, + MoqtResponseCallback response_callback, + quiche::SingleUseCallback<void(MoqtRequestErrorInfo)> cancel_callback) { if (is_closing_) { - return; + return false; } QUICHE_DCHECK(track_namespace.IsValid()); - if (outgoing_publish_namespaces_.contains(track_namespace)) { - std::move(callback)( - track_namespace, - MoqtRequestErrorInfo{ - RequestErrorCode::kInternalError, std::nullopt, - "PUBLISH_NAMESPACE already outstanding for namespace"}); - return; + if (publish_namespace_by_namespace_.contains(track_namespace)) { + return false; } if (next_request_id_ >= peer_max_request_id_) { if (!last_requests_blocked_sent_.has_value() || @@ -364,13 +359,17 @@ << next_request_id_ << " which is greater than the maximum ID " << peer_max_request_id_; - return; + return false; } if (received_goaway_ || sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Tried to send PUBLISH_NAMESPACE after GOAWAY"; - return; + return false; } + publish_namespace_by_namespace_[track_namespace] = next_request_id_; + publish_namespace_by_id_[next_request_id_] = + PublishNamespaceState{track_namespace, std::move(response_callback), + std::move(cancel_callback)}; MoqtPublishNamespace message; message.request_id = next_request_id_; next_request_id_ += 2; @@ -379,39 +378,78 @@ SendControlMessage(framer_.SerializePublishNamespace(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE message for " << message.track_namespace; - pending_outgoing_publish_namespaces_[message.request_id] = track_namespace; - outgoing_publish_namespaces_[track_namespace] = std::move(callback); + return true; } -bool MoqtSession::PublishNamespaceDone(TrackNamespace track_namespace) { +bool MoqtSession::PublishNamespaceUpdate( + const TrackNamespace& track_namespace, MessageParameters& parameters, + MoqtResponseCallback response_callback) { if (is_closing_) { return false; } QUICHE_DCHECK(track_namespace.IsValid()); - auto it = outgoing_publish_namespaces_.find(track_namespace); - if (it == outgoing_publish_namespaces_.end()) { + auto it = publish_namespace_by_namespace_.find(track_namespace); + if (it == publish_namespace_by_namespace_.end()) { return false; // Could have been destroyed by PUBLISH_NAMESPACE_CANCEL. } - MoqtPublishNamespaceDone message; - message.track_namespace = track_namespace; - SendControlMessage(framer_.SerializePublishNamespaceDone(message)); - QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE_DONE message for " - << message.track_namespace; - outgoing_publish_namespaces_.erase(it); + if (next_request_id_ >= peer_max_request_id_) { + if (!last_requests_blocked_sent_.has_value() || + peer_max_request_id_ > *last_requests_blocked_sent_) { + MoqtRequestsBlocked requests_blocked; + requests_blocked.max_request_id = peer_max_request_id_; + SendControlMessage(framer_.SerializeRequestsBlocked(requests_blocked)); + last_requests_blocked_sent_ = peer_max_request_id_; + } + QUIC_DLOG(INFO) << ENDPOINT << "Tried to send PUBLISH_NAMESPACE with ID " + << next_request_id_ + << " which is greater than the maximum ID " + << peer_max_request_id_; + return false; + } + MoqtRequestUpdate message; + message.request_id = next_request_id_; + message.existing_request_id = it->second; + message.parameters = parameters; + publish_namespace_updates_[next_request_id_] = std::move(response_callback); + next_request_id_ += 2; + SendControlMessage(framer_.SerializeRequestUpdate(message)); return true; } -void MoqtSession::CancelPublishNamespace(TrackNamespace track_namespace, +bool MoqtSession::PublishNamespaceDone(const TrackNamespace& track_namespace) { + if (is_closing_) { + return false; + } + QUICHE_DCHECK(track_namespace.IsValid()); + auto it = publish_namespace_by_namespace_.find(track_namespace); + if (it == publish_namespace_by_namespace_.end()) { + return false; // Could have been destroyed by PUBLISH_NAMESPACE_CANCEL. + } + MoqtPublishNamespaceDone message; + message.request_id = it->second; + SendControlMessage(framer_.SerializePublishNamespaceDone(message)); + QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE_DONE message for " + << track_namespace; + publish_namespace_by_id_.erase(it->second); + publish_namespace_by_namespace_.erase(it); + return true; +} + +bool MoqtSession::PublishNamespaceCancel(const TrackNamespace& track_namespace, RequestErrorCode code, absl::string_view reason) { QUICHE_DCHECK(track_namespace.IsValid()); - MoqtPublishNamespaceCancel message{track_namespace, code, - std::string(reason)}; - - incoming_publish_namespaces_.erase(track_namespace); + auto it = incoming_publish_namespaces_by_namespace_.find(track_namespace); + if (it == publish_namespace_by_namespace_.end()) { + return false; // Could have been destroyed by PUBLISH_NAMESPACE_DONE. + } + MoqtPublishNamespaceCancel message{it->second, code, std::string(reason)}; + incoming_publish_namespaces_by_id_.erase(it->second); + incoming_publish_namespaces_by_namespace_.erase(track_namespace); SendControlMessage(framer_.SerializePublishNamespaceCancel(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent PUBLISH_NAMESPACE_CANCEL message for " - << message.track_namespace << " with reason " << reason; + << track_namespace << " with reason " << reason; + return true; } bool MoqtSession::Subscribe(const FullTrackName& name, @@ -1131,18 +1169,14 @@ return; } // Response to PUBLISH_NAMESPACE. - auto pn_it = - session_->pending_outgoing_publish_namespaces_.find(message.request_id); - if (pn_it != session_->pending_outgoing_publish_namespaces_.end()) { - TrackNamespace track_namespace = pn_it->second; - session_->pending_outgoing_publish_namespaces_.erase(pn_it); - auto callback_it = - session_->outgoing_publish_namespaces_.find(track_namespace); - if (callback_it == session_->outgoing_publish_namespaces_.end()) { - // It might have already been destroyed due to PUBLISH_NAMESPACE_DONE. + auto pn_it = session_->publish_namespace_by_id_.find(message.request_id); + if (pn_it != session_->publish_namespace_by_id_.end()) { + if (pn_it->second.response_callback == nullptr) { + session_->Error(MoqtError::kProtocolViolation, + "Multiple responses for PUBLISH_NAMESPACE"); return; } - std::move(callback_it->second)(track_namespace, std::nullopt); + std::move(pn_it->second.response_callback)(std::nullopt); return; } // Response to SUBSCRIBE_NAMESPACE is handled in the NamespaceStream. @@ -1202,17 +1236,17 @@ return; } // Response to PUBLISH_NAMESPACE. - auto pn_it = - session_->pending_outgoing_publish_namespaces_.find(message.request_id); - if (pn_it != session_->pending_outgoing_publish_namespaces_.end()) { - TrackNamespace& track_namespace = pn_it->second; - auto it2 = session_->outgoing_publish_namespaces_.find(track_namespace); - if (it2 == session_->outgoing_publish_namespaces_.end()) { - return; // State might have been destroyed due to PUBLISH_NAMESPACE_DONE. + auto pn_it = session_->publish_namespace_by_id_.find(message.request_id); + if (pn_it != session_->publish_namespace_by_id_.end()) { + if (pn_it->second.response_callback == nullptr) { + session_->Error(MoqtError::kProtocolViolation, + "Multiple responses for PUBLISH_NAMESPACE"); + return; } - std::move(it2->second)(track_namespace, error_info); - session_->pending_outgoing_publish_namespaces_.erase(pn_it); - session_->outgoing_publish_namespaces_.erase(it2); + std::move(pn_it->second.response_callback)(error_info); + session_->publish_namespace_by_namespace_.erase( + pn_it->second.track_namespace); + session_->publish_namespace_by_id_.erase(pn_it); return; } // Response to SUBSCRIBE_NAMESPACE is handled in the NamespaceStream. @@ -1259,6 +1293,33 @@ SendRequestOk(message.request_id, MessageParameters()); return; } + auto pn_it = + session_->publish_namespace_by_id_.find(message.existing_request_id); + if (pn_it != session_->publish_namespace_by_id_.end()) { + // It's updating PUBLISH_NAMESPACE. + quiche::QuicheWeakPtr<MoqtSessionInterface> session_weakptr = + session_->GetWeakPtr(); + TrackNamespace track_namespace = pn_it->second.track_namespace; + session_->callbacks().incoming_publish_namespace_callback( + pn_it->second.track_namespace, message.parameters, + [&](std::optional<MoqtRequestErrorInfo> error) { + MoqtSession* session = + absl::down_cast<MoqtSession*>(session_weakptr.GetIfAvailable()); + if (session == nullptr) { + return; + } + if (error.has_value()) { + SendRequestError(message.request_id, *error); + session->incoming_publish_namespaces_by_id_.erase( + message.request_id); + session->incoming_publish_namespaces_by_namespace_.erase( + track_namespace); + } else { + SendRequestOk(message.request_id, MessageParameters()); + } + }); + return; + } // TODO(martinduke): Check all the request types. // Does not match any known request. SendRequestError(message.request_id, RequestErrorCode::kNotSupported, @@ -1278,8 +1339,19 @@ } QUIC_DLOG(INFO) << ENDPOINT << "Received a PUBLISH_NAMESPACE for " << message.track_namespace; + auto [it, inserted] = + session_->incoming_publish_namespaces_by_namespace_.emplace( + message.track_namespace, message.request_id); + if (!inserted) { + SendRequestError(message.request_id, + RequestErrorCode::kDuplicateSubscription, std::nullopt, + "Duplicate PUBLISH_NAMESPACE"); + return; + } quiche::QuicheWeakPtr<MoqtSessionInterface> session_weakptr = session_->GetWeakPtr(); + session_->incoming_publish_namespaces_by_id_[message.request_id] = + message.track_namespace; session_->callbacks_.incoming_publish_namespace_callback( message.track_namespace, message.parameters, [&](std::optional<MoqtRequestErrorInfo> error) { @@ -1290,36 +1362,38 @@ } if (error.has_value()) { SendRequestError(message.request_id, *error); + session->incoming_publish_namespaces_by_id_.erase(message.request_id); + session->incoming_publish_namespaces_by_namespace_.erase( + message.track_namespace); } else { SendRequestOk(message.request_id, MessageParameters()); - session->incoming_publish_namespaces_.insert(message.track_namespace); } }); } void MoqtSession::ControlStream::OnPublishNamespaceDoneMessage( const MoqtPublishNamespaceDone& message) { - session_->incoming_publish_namespaces_.erase(message.track_namespace); + auto it = + session_->incoming_publish_namespaces_by_id_.find(message.request_id); + if (it == session_->incoming_publish_namespaces_by_id_.end()) { + return; + } session_->callbacks_.incoming_publish_namespace_callback( - message.track_namespace, std::nullopt, nullptr); + it->second, std::nullopt, nullptr); + session_->incoming_publish_namespaces_by_namespace_.erase(it->second); + session_->incoming_publish_namespaces_by_id_.erase(it); } void MoqtSession::ControlStream::OnPublishNamespaceCancelMessage( const MoqtPublishNamespaceCancel& message) { - // The spec currently says that if a later SUBSCRIBE arrives for this - // namespace, that SHOULD be a session error. I'm hoping that via Issue #557, - // this will go away. Regardless, a SHOULD will not compel the session to keep - // state forever, so there is no support for this requirement. - auto it = - session_->outgoing_publish_namespaces_.find(message.track_namespace); - if (it == session_->outgoing_publish_namespaces_.end()) { + auto it = session_->publish_namespace_by_id_.find(message.request_id); + if (it == session_->publish_namespace_by_id_.end()) { return; // State might have been destroyed due to PUBLISH_NAMESPACE_DONE. } - std::move(it->second)( - message.track_namespace, - MoqtRequestErrorInfo{message.error_code, std::nullopt, - std::string(message.error_reason)}); - session_->outgoing_publish_namespaces_.erase(it); + std::move(it->second.cancel_callback)(MoqtRequestErrorInfo{ + message.error_code, std::nullopt, std::string(message.error_reason)}); + session_->publish_namespace_by_namespace_.erase(it->second.track_namespace); + session_->publish_namespace_by_id_.erase(it); } void MoqtSession::ControlStream::OnTrackStatusMessage( @@ -2383,14 +2457,13 @@ // session owns the webtransport stream, which owns the StreamVisitor, which // owns the task. Destroying the task notifies the application. published_subscriptions_.clear(); - for (const TrackNamespace& track_namespace : incoming_publish_namespaces_) { - callbacks_.incoming_publish_namespace_callback(track_namespace, - std::nullopt, nullptr); + for (auto& it : incoming_publish_namespaces_by_namespace_) { + callbacks_.incoming_publish_namespace_callback(it.first, std::nullopt, + nullptr); } - for (auto& [track_namespace, callback] : outgoing_publish_namespaces_) { - callback(track_namespace, - MoqtRequestErrorInfo{RequestErrorCode::kUninterested, std::nullopt, - "Session closed"}); + for (auto& it : publish_namespace_by_id_) { + std::move(it.second.cancel_callback)(MoqtRequestErrorInfo{ + RequestErrorCode::kUninterested, std::nullopt, "Session closed"}); } while (!upstream_by_id_.empty()) { auto upstream = upstream_by_id_.begin();
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index a0562de..1abe949 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -42,6 +42,7 @@ #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" +#include "quiche/common/quiche_callbacks.h" #include "quiche/common/quiche_circular_deque.h" #include "quiche/common/quiche_mem_slice.h" #include "quiche/common/quiche_weak_ptr.h" @@ -99,12 +100,6 @@ quic::Perspective perspective() const { return parameters_.perspective; } - // Allows the subscriber to declare it will not subscribe to |track_namespace| - // anymore. - void CancelPublishNamespace(TrackNamespace track_namespace, - RequestErrorCode code, - absl::string_view reason_phrase); - // MoqtSessionInterface implementation. MoqtSessionCallbacks& callbacks() override { return callbacks_; } void Error(MoqtError code, absl::string_view error) override; @@ -130,10 +125,18 @@ uint64_t num_previous_groups, MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, VersionSpecificParameters parameters) override; - void PublishNamespace(TrackNamespace track_namespace, - MoqtOutgoingPublishNamespaceCallback callback, - VersionSpecificParameters parameters) override; - bool PublishNamespaceDone(TrackNamespace track_namespace) override; + bool PublishNamespace(const TrackNamespace& track_namespace, + const MessageParameters& parameters, + MoqtResponseCallback response_callback, + quiche::SingleUseCallback<void(MoqtRequestErrorInfo)> + cancel_callback) override; + bool PublishNamespaceUpdate(const TrackNamespace& track_namespace, + MessageParameters& parameters, + MoqtResponseCallback response_callback) override; + bool PublishNamespaceDone(const TrackNamespace& track_namespace) override; + bool PublishNamespaceCancel(const TrackNamespace& track_namespace, + RequestErrorCode error_code, + absl::string_view error_reason) override; // TODO(martinduke): Support PUBLISH. For now, PUBLISH-only requests will be // rejected with nullptr, and kBoth requests will change to kNamespace. // After receiving MoqtNamespaceTask, call @@ -851,13 +854,20 @@ absl::flat_hash_map<FullTrackName, MoqtPublishingMonitorInterface*> monitoring_interfaces_for_published_tracks_; - // Outgoing PUBLISH_NAMESPACE for which no OK or ERROR has been received. + // PUBLISH_NAMESPACE state. + struct PublishNamespaceState { + TrackNamespace track_namespace; + MoqtResponseCallback response_callback; + quiche::SingleUseCallback<void(MoqtRequestErrorInfo)> cancel_callback; + }; + absl::flat_hash_map<uint64_t, PublishNamespaceState> publish_namespace_by_id_; + absl::flat_hash_map<TrackNamespace, uint64_t> publish_namespace_by_namespace_; + absl::flat_hash_map<uint64_t, MoqtResponseCallback> + publish_namespace_updates_; + absl::flat_hash_map<TrackNamespace, uint64_t> + incoming_publish_namespaces_by_namespace_; absl::flat_hash_map<uint64_t, TrackNamespace> - pending_outgoing_publish_namespaces_; - // All outgoing PUBLISH_NAMESPACE. - absl::flat_hash_map<TrackNamespace, MoqtOutgoingPublishNamespaceCallback> - outgoing_publish_namespaces_; - absl::flat_hash_set<TrackNamespace> incoming_publish_namespaces_; + incoming_publish_namespaces_by_id_; // It's an error if the namespaces overlap, so keep track of them. SessionNamespaceTree incoming_subscribe_namespace_;
diff --git a/quiche/quic/moqt/moqt_session_callbacks.h b/quiche/quic/moqt/moqt_session_callbacks.h index 2d070b7..098c2a4 100644 --- a/quiche/quic/moqt/moqt_session_callbacks.h +++ b/quiche/quic/moqt/moqt_session_callbacks.h
@@ -39,9 +39,11 @@ // received from the peer. PUBLISH_NAMESPACE sets a value for |parameters|, // PUBLISH_NAMESPACE_DONE does not. This callback is not invoked by NAMESPACE or // NAMESPACE_DONE messages that arrive on a SUBSCRIBE_NAMESPACE stream. +// If the PUBLISH_NAMESPACE is updated, it will be called again, so be prepared +// for duplicates. using MoqtIncomingPublishNamespaceCallback = quiche::MultiUseCallback<void( const TrackNamespace& track_namespace, - const std::optional<VersionSpecificParameters>& parameters, + const std::optional<MessageParameters>& parameters, MoqtResponseCallback callback)>; // Called whenever SUBSCRIBE_NAMESPACE is received from the peer. Unsubscribe @@ -55,14 +57,14 @@ MoqtResponseCallback response_callback)>; inline void DefaultIncomingPublishNamespaceCallback( - const TrackNamespace&, const std::optional<VersionSpecificParameters>&, + const TrackNamespace&, const std::optional<MessageParameters>&, MoqtResponseCallback callback) { if (callback == nullptr) { return; } return std::move(callback)(MoqtRequestErrorInfo{ RequestErrorCode::kNotSupported, std::nullopt, - "This endpoint does not support incoming SUBSCRIBE_NAMESPACE messages"}); + "This endpoint does not support incoming PUBLISH_NAMESPACE messages"}); }; inline std::unique_ptr<MoqtNamespaceTask>
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index c69d5c5..cb67da6 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -71,18 +71,6 @@ using FetchResponseCallback = quiche::SingleUseCallback<void(std::unique_ptr<MoqtFetchTask> fetch_task)>; -// TODO(martinduke): MoqtOutgoingPublishNamespaceCallback is deprecated. Remove. - -// If |error| is nullopt, this is triggered by a PUBLISH_NAMESPACE_OK. -// Otherwise, it is triggered by REQUEST_ERROR or PUBLISH_NAMESPACE_CANCEL. For -// ERROR or CANCEL, MoqtSession is deleting all PUBLISH_NAMESPACE state -// immediately after calling this callback. -// Alternatively, the application can call PublishNamespaceDone() to delete the -// state. -using MoqtOutgoingPublishNamespaceCallback = - quiche::MultiUseCallback<void(const TrackNamespace& track_namespace, - std::optional<MoqtRequestErrorInfo> error)>; - class MoqtSessionInterface { public: virtual ~MoqtSessionInterface() = default; @@ -138,15 +126,25 @@ MoqtPriority priority, std::optional<MoqtDeliveryOrder> delivery_order, VersionSpecificParameters parameters) = 0; // Send a PUBLISH_NAMESPACE message for |track_namespace|, and call - // |publish_namespace_callback| when the response arrives. Will fail + // |response_callback| when the response arrives. Will fail // immediately if there is already an unresolved PUBLISH_NAMESPACE for that - // namespace. - virtual void PublishNamespace(TrackNamespace track_namespace, - MoqtOutgoingPublishNamespaceCallback callback, - VersionSpecificParameters parameters) = 0; - // Returns true if message was sent, false if there is no PUBLISH_NAMESPACE to - // cancel. - virtual bool PublishNamespaceDone(TrackNamespace track_namespace) = 0; + // namespace. Calls |cancel_callback| if the peer sends a + // PUBLISH_NAMESPACE_CANCEL. Returns true if the message was sent. + virtual bool PublishNamespace( + const TrackNamespace& track_namespace, + const MessageParameters& parameters, + MoqtResponseCallback response_callback, + quiche::SingleUseCallback<void(MoqtRequestErrorInfo)> + cancel_callback) = 0; + virtual bool PublishNamespaceUpdate( + const TrackNamespace& track_namespace, MessageParameters& parameters, + MoqtResponseCallback response_callback) = 0; + // Returns true if message was sent, false if there is no PUBLISH_NAMESPACE + // that relates. + virtual bool PublishNamespaceDone(const TrackNamespace& track_namespace) = 0; + virtual bool PublishNamespaceCancel(const TrackNamespace& track_namespace, + RequestErrorCode error_code, + absl::string_view error_reason) = 0; // Sends a SUBSCRIBE_NAMESPACE message for |prefix| and returns a // MoqtNamespaceTask that can be used to process the response.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 5ebddab..e0423ac 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -433,63 +433,53 @@ } TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndCancel) { - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> - publish_namespace_resolved_callback; + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo> error_message)> + publish_namespace_response_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL( mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); - session_.PublishNamespace(TrackNamespace("foo"), - publish_namespace_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + MoqtRequestErrorInfo cancel_error_info; + session_.PublishNamespace( + TrackNamespace("foo"), MessageParameters(), + publish_namespace_response_callback.AsStdFunction(), + [&](MoqtRequestErrorInfo info) { cancel_error_info = info; }); MoqtRequestOk ok = {/*request_id=*/0, MessageParameters()}; - EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace("foo")); + EXPECT_CALL(publish_namespace_response_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { EXPECT_FALSE(error.has_value()); }); stream_input->OnRequestOkMessage(ok); MoqtPublishNamespaceCancel cancel = { - TrackNamespace("foo"), + /*request_id=*/0, RequestErrorCode::kInternalError, /*error_reason=*/"Test error", }; - EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace("foo")); - ASSERT_TRUE(error.has_value()); - EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); - EXPECT_EQ(error->reason_phrase, "Test error"); - }); stream_input->OnPublishNamespaceCancelMessage(cancel); + EXPECT_EQ(cancel_error_info.error_code, RequestErrorCode::kInternalError); + EXPECT_EQ(cancel_error_info.reason_phrase, "Test error"); // State is gone. EXPECT_FALSE(session_.PublishNamespaceDone(TrackNamespace("foo"))); } TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndPublishNamespaceDone) { - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL( mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); - session_.PublishNamespace(TrackNamespace{"foo"}, + session_.PublishNamespace(TrackNamespace{"foo"}, MessageParameters(), publish_namespace_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + [](MoqtRequestErrorInfo) {}); MoqtRequestOk ok = {/*request_id=*/0, MessageParameters()}; - EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); + EXPECT_CALL(publish_namespace_resolved_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { EXPECT_FALSE(error.has_value()); }); stream_input->OnRequestOkMessage(ok); @@ -503,24 +493,21 @@ } TEST_F(MoqtSessionTest, PublishNamespaceWithError) { - testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error_message)> + testing::MockFunction<void(std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); EXPECT_CALL( mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kPublishNamespace), _)); - session_.PublishNamespace(TrackNamespace{"foo"}, + session_.PublishNamespace(TrackNamespace{"foo"}, MessageParameters(), publish_namespace_resolved_callback.AsStdFunction(), - VersionSpecificParameters()); + [](MoqtRequestErrorInfo) {}); MoqtRequestError error{/*request_id=*/0, RequestErrorCode::kInternalError, std::nullopt, "Test error"}; - EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) - .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> error) { - EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); + EXPECT_CALL(publish_namespace_resolved_callback, Call) + .WillOnce([&](std::optional<MoqtRequestErrorInfo> error) { ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); EXPECT_EQ(error->reason_phrase, "Test error"); @@ -954,17 +941,18 @@ TrackNamespace track_namespace{"foo"}; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, - *parameters, + parameters, }; EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, - Call(track_namespace, parameters, _)) + Call(track_namespace, std::make_optional(parameters), _)) .WillOnce([](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -973,17 +961,15 @@ kDefaultPeerRequestId, MessageParameters()}), _)); stream_input->OnPublishNamespaceMessage(publish_namespace); - MoqtPublishNamespaceDone unpublish_namespace = { - track_namespace, + MoqtPublishNamespaceDone publish_namespace_done = { + /*request_id=*/0, }; - EXPECT_CALL( - session_callbacks_.incoming_publish_namespace_callback, - Call(track_namespace, std::optional<VersionSpecificParameters>(), _)) + EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, + Call(track_namespace, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + [](const TrackNamespace&, const std::optional<MessageParameters>&, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); - stream_input->OnPublishNamespaceDoneMessage(unpublish_namespace); + stream_input->OnPublishNamespaceDoneMessage(publish_namespace_done); } TEST_F(MoqtSessionTest, @@ -992,17 +978,18 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, - *parameters, + parameters, }; EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, - Call(track_namespace, parameters, _)) + Call(track_namespace, std::make_optional(parameters), _)) .WillOnce([](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -1013,10 +1000,10 @@ stream_input->OnPublishNamespaceMessage(publish_namespace); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(MoqtPublishNamespaceCancel{ - track_namespace, RequestErrorCode::kInternalError, - "deadbeef"}), + kDefaultPeerRequestId, + RequestErrorCode::kInternalError, "deadbeef"}), _)); - session_.CancelPublishNamespace(track_namespace, + session_.PublishNamespaceCancel(track_namespace, RequestErrorCode::kInternalError, "deadbeef"); } @@ -1025,12 +1012,13 @@ std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + MessageParameters parameters; + parameters.authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtPublishNamespace publish_namespace = { kDefaultPeerRequestId, track_namespace, - *parameters, + parameters, }; MoqtRequestErrorInfo error = { RequestErrorCode::kNotSupported, @@ -1038,10 +1026,9 @@ "deadbeef", }; EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, - Call(track_namespace, parameters, _)) + Call(track_namespace, std::make_optional(parameters), _)) .WillOnce( - [&](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + [&](const TrackNamespace&, const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(error); }); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(MoqtRequestError{ @@ -3529,9 +3516,9 @@ +[](std::optional<MoqtRequestErrorInfo>) {}), nullptr); session_.PublishNamespace( - TrackNamespace{"foo"}, - +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, - VersionSpecificParameters()); + TrackNamespace{"foo"}, MessageParameters(), + +[](std::optional<MoqtRequestErrorInfo>) {}, + +[](MoqtRequestErrorInfo) {}); EXPECT_FALSE(session_.Fetch( FullTrackName{TrackNamespace("foo"), "bar"}, +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, @@ -3562,8 +3549,8 @@ stream_input->OnSubscribeMessage(DefaultSubscribe()); EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); - stream_input->OnPublishNamespaceMessage(MoqtPublishNamespace( - 3, TrackNamespace("foo"), VersionSpecificParameters())); + stream_input->OnPublishNamespaceMessage( + MoqtPublishNamespace(3, TrackNamespace("foo"), MessageParameters())); EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); MoqtFetch fetch = DefaultFetch(); @@ -3597,9 +3584,9 @@ +[](std::optional<MoqtRequestErrorInfo>) {}), nullptr); session_.PublishNamespace( - TrackNamespace{"foo"}, - +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, - VersionSpecificParameters()); + TrackNamespace{"foo"}, MessageParameters(), + +[](std::optional<MoqtRequestErrorInfo>) {}, + +[](MoqtRequestErrorInfo) {}); EXPECT_FALSE(session_.Fetch( FullTrackName(TrackNamespace("foo"), "bar"), +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, @@ -4109,11 +4096,11 @@ MoqtSessionPeer::CreateControlStream(&session_, &control_stream); // Register two incoming PUBLISH_NAMESPACE. MoqtPublishNamespace publish_namespace{ - /*request_id=*/1, TrackNamespace{"foo"}, VersionSpecificParameters()}; + /*request_id=*/1, TrackNamespace{"foo"}, MessageParameters()}; EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(TrackNamespace{"foo"}, _, _)) .WillOnce([&](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -4122,11 +4109,11 @@ stream_input->OnPublishNamespaceMessage(publish_namespace); publish_namespace = MoqtPublishNamespace( - /*request_id=*/3, TrackNamespace{"bar"}, VersionSpecificParameters()); + /*request_id=*/3, TrackNamespace{"bar"}, MessageParameters()); EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, Call(TrackNamespace{"bar"}, _, _)) .WillOnce([&](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + const std::optional<MessageParameters>&, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -4135,23 +4122,21 @@ stream_input->OnPublishNamespaceMessage(publish_namespace); // Revoke "bar" - MoqtPublishNamespaceDone done{TrackNamespace{"bar"}}; - EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"bar"}, - std::optional<VersionSpecificParameters>(), _)) + MoqtPublishNamespaceDone done{/*request_id=*/3}; + EXPECT_CALL( + session_callbacks_.incoming_publish_namespace_callback, + Call(TrackNamespace{"bar"}, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + [](const TrackNamespace&, const std::optional<MessageParameters>&, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); stream_input->OnPublishNamespaceDoneMessage(done); // Destroying the session should revoke "foo". - EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, - Call(TrackNamespace{"foo"}, - std::optional<VersionSpecificParameters>(), _)) + EXPECT_CALL( + session_callbacks_.incoming_publish_namespace_callback, + Call(TrackNamespace{"foo"}, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, - const std::optional<VersionSpecificParameters>&, + [](const TrackNamespace&, const std::optional<MessageParameters>&, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); // Test teardown will destroy session_, triggering removal of "foo". }
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h index a106fa7..48c0553 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -57,12 +57,23 @@ std::optional<MoqtDeliveryOrder> delivery_order, VersionSpecificParameters parameters), (override)); - MOCK_METHOD(void, PublishNamespace, - (TrackNamespace track_namespace, - MoqtOutgoingPublishNamespaceCallback callback, - VersionSpecificParameters parameters), + MOCK_METHOD( + bool, PublishNamespace, + (const TrackNamespace& track_namespace, + const MessageParameters& parameters, + MoqtResponseCallback response_callback, + quiche::SingleUseCallback<void(MoqtRequestErrorInfo)> cancel_callback), + (override)); + MOCK_METHOD(bool, PublishNamespaceUpdate, + (const TrackNamespace& track_namespace, + MessageParameters& parameters, + MoqtResponseCallback response_callback), (override)); - MOCK_METHOD(bool, PublishNamespaceDone, (TrackNamespace track_namespace), + MOCK_METHOD(bool, PublishNamespaceDone, + (const TrackNamespace& track_namespace), (override)); + MOCK_METHOD(bool, PublishNamespaceCancel, + (const TrackNamespace& track_namespace, + RequestErrorCode error_code, absl::string_view error_reason), (override)); MOCK_METHOD(std::unique_ptr<MoqtNamespaceTask>, SubscribeNamespace, (TrackNamespace&, SubscribeNamespaceOption,
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index 74dcf57..b9dc508 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -41,7 +41,7 @@ testing::MockFunction<void(absl::string_view)> session_terminated_callback; testing::MockFunction<void()> session_deleted_callback; testing::MockFunction<void(const TrackNamespace&, - std::optional<VersionSpecificParameters>, + const std::optional<MessageParameters>&, MoqtResponseCallback)> incoming_publish_namespace_callback; testing::MockFunction<std::unique_ptr<MoqtNamespaceTask>(
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 947be1b..0ebf5fa 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -19,6 +19,7 @@ #include "quiche/quic/core/quic_data_reader.h" #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_names.h" @@ -925,6 +926,8 @@ public: PublishNamespaceMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + publish_namespace_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "bar")); } bool EqualFieldValues(MessageStructuredData& values) const override { @@ -961,7 +964,7 @@ MoqtPublishNamespace publish_namespace_ = { /*request_id=*/2, TrackNamespace{"foo"}, - VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), + MessageParameters(), }; }; @@ -1076,26 +1079,29 @@ bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtPublishNamespaceDone>(values); - if (cast.track_namespace != publish_namespace_done_.track_namespace) { - QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_DONE track namespace mismatch"; + if (cast.request_id != publish_namespace_done_.request_id) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE_DONE request ID mismatch"; return false; } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("v"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(publish_namespace_done_); } private: - uint8_t raw_packet_[8] = { - 0x09, 0x00, 0x05, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace + uint8_t raw_packet_[4] = { + 0x09, + 0x00, + 0x01, + 0x01, // request_id = 1 }; MoqtPublishNamespaceDone publish_namespace_done_ = { - TrackNamespace("foo"), + /*request_id=*/1, }; }; @@ -1107,8 +1113,8 @@ bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtPublishNamespaceCancel>(values); - if (cast.track_namespace != publish_namespace_cancel_.track_namespace) { - QUIC_LOG(INFO) << "PUBLISH_NAMESPACE CANCEL track namespace mismatch"; + if (cast.request_id != publish_namespace_cancel_.request_id) { + QUIC_LOG(INFO) << "PUBLISH_NAMESPACE CANCEL request ID mismatch"; return false; } if (cast.error_code != publish_namespace_cancel_.error_code) { @@ -1122,22 +1128,21 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vv---vv---"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(publish_namespace_cancel_); } private: - uint8_t raw_packet_[13] = { - 0x0c, 0x00, 0x0a, 0x01, - 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + uint8_t raw_packet_[9] = { + 0x0c, 0x00, 0x06, 0x02, // request_id = 2 0x03, // error_code = 3 0x03, 0x62, 0x61, 0x72, // error_reason = "bar" }; MoqtPublishNamespaceCancel publish_namespace_cancel_ = { - TrackNamespace("foo"), + /*request_id=*/2, RequestErrorCode::kNotSupported, /*error_reason=*/"bar", };
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index 0baf539..a82406c 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -49,7 +49,7 @@ void ChatClient::OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, - std::optional<VersionSpecificParameters> parameters, + const std::optional<MessageParameters>& parameters, moqt::MoqtResponseCallback absl_nullable callback) { if (!session_is_open_) { return; @@ -261,9 +261,8 @@ queue_ = std::make_shared<MoqtOutgoingQueue>(my_track_name_); publisher_.Add(queue_); session_->set_publisher(&publisher_); - MoqtOutgoingPublishNamespaceCallback publish_namespace_callback = - [this](TrackNamespace track_namespace, - std::optional<MoqtRequestErrorInfo> reason) { + MoqtResponseCallback publish_namespace_callback = + [this](std::optional<MoqtRequestErrorInfo> reason) { if (reason.has_value()) { std::cout << "PUBLISH_NAMESPACE rejected, " << reason->reason_phrase << "\n"; @@ -271,15 +270,27 @@ "Local PUBLISH_NAMESPACE rejected"); return; } - std::cout << "PUBLISH_NAMESPACE for " << track_namespace.ToString() - << " accepted\n"; + std::cout << "PUBLISH_NAMESPACE accepted\n"; return; }; std::cout << "Announcing " << GetUserNamespace(my_track_name_).ToString() << "\n"; - session_->PublishNamespace(GetUserNamespace(my_track_name_), - std::move(publish_namespace_callback), - VersionSpecificParameters()); + session_->PublishNamespace( + GetUserNamespace(my_track_name_), MessageParameters(), + [this](std::optional<MoqtRequestErrorInfo> reason) { + if (reason.has_value()) { + std::cout << "PUBLISH_NAMESPACE rejected, " << reason->reason_phrase + << "\n"; + session_->Error(MoqtError::kInternalError, + "Local PUBLISH_NAMESPACE rejected"); + return; + } + std::cout << "PUBLISH_NAMESPACE for " + << GetUserNamespace(my_track_name_).ToString() + << " accepted\n"; + return; + }, + [](MoqtRequestErrorInfo) {}); // Send SUBSCRIBE_NAMESPACE. Pop 3 levels of namespace to get to // {moq-chat, chat-id} @@ -331,7 +342,7 @@ OnIncomingPublishNamespace( *track_namespace, (type == TransactionType::kAdd) - ? std::make_optional(VersionSpecificParameters()) + ? std::make_optional(MessageParameters()) : std::nullopt, /*callback=*/nullptr); break;
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index a042e8d..516ea07 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -141,7 +141,7 @@ // a PUBLISH_NAMESPACE. void OnIncomingPublishNamespace( const moqt::TrackNamespace& track_namespace, - std::optional<VersionSpecificParameters> parameters, + const std::optional<MessageParameters>& parameters, moqt::MoqtResponseCallback absl_nullable callback); // Basic session information
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index c3de814..80330b7 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -30,7 +30,10 @@ #include "absl/time/clock.h" #include "absl/time/time.h" #include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_fetch_task.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_session_callbacks.h" @@ -121,7 +124,7 @@ // TODO(martinduke): Handle when |publish_namespace| is false // (PUBLISH_NAMESPACE_DONE). void OnPublishNamespaceReceived(TrackNamespace track_namespace, - std::optional<VersionSpecificParameters>, + std::optional<MessageParameters>, MoqtResponseCallback callback) { if (!IsValidTrackNamespace(track_namespace) && !quiche::GetQuicheCommandLineFlag(
diff --git a/quiche/quic/moqt/tools/moqt_relay.cc b/quiche/quic/moqt/tools/moqt_relay.cc index cc7dd54..74e35fa 100644 --- a/quiche/quic/moqt/tools/moqt_relay.cc +++ b/quiche/quic/moqt/tools/moqt_relay.cc
@@ -110,10 +110,9 @@ void MoqtRelay::SetNamespaceCallbacks(MoqtSessionInterface* session) { session->callbacks().incoming_publish_namespace_callback = - [this, session]( - const TrackNamespace& track_namespace, - const std::optional<VersionSpecificParameters>& parameters, - MoqtResponseCallback callback) { + [this, session](const TrackNamespace& track_namespace, + const std::optional<MessageParameters>& parameters, + MoqtResponseCallback callback) { if (is_closing_) { return; }
diff --git a/quiche/quic/moqt/tools/moqt_relay_test.cc b/quiche/quic/moqt/tools/moqt_relay_test.cc index 8aa2581..1f677bc 100644 --- a/quiche/quic/moqt/tools/moqt_relay_test.cc +++ b/quiche/quic/moqt/tools/moqt_relay_test.cc
@@ -134,9 +134,8 @@ nullptr); // relay_ publishes a namespace, so upstream_ will route to relay_. relay_.client_session()->PublishNamespace( - TrackNamespace({"foo"}), - [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, - VersionSpecificParameters()); + TrackNamespace({"foo"}), MessageParameters(), + [](std::optional<MoqtRequestErrorInfo>) {}, [](MoqtRequestErrorInfo) {}); upstream_.RunOneEvent(); // There is now an upstream session for "Foo". std::shared_ptr<MoqtTrackPublisher> track = @@ -192,8 +191,8 @@ // Downstream publishes a namespace. It's stored in relay_ but upstream_ // hasn't been notified. downstream_.client_session()->PublishNamespace( - foobar, [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, - VersionSpecificParameters()); + foobar, MessageParameters(), [](std::optional<MoqtRequestErrorInfo>) {}, + [](MoqtRequestErrorInfo) {}); relay_.RunOneEvent(); upstream_.RunOneEvent(); EXPECT_THAT(relay_published_namespaces, ElementsAre(foobar)); @@ -209,7 +208,7 @@ while (task->GetNextSuffix(suffix, type) == kSuccess) { if (type == TransactionType::kAdd) { upstream_.publisher()->OnPublishNamespace( - *task->prefix().AddSuffix(suffix), VersionSpecificParameters(), + *task->prefix().AddSuffix(suffix), MessageParameters(), upstream_session, nullptr); } else { upstream_.publisher()->OnPublishNamespaceDone( @@ -223,8 +222,8 @@ // Downstream publishes another namespace. Everyone is notified. downstream_.client_session()->PublishNamespace( - foobaz, [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, - VersionSpecificParameters()); + foobaz, MessageParameters(), [](std::optional<MoqtRequestErrorInfo>) {}, + [](MoqtRequestErrorInfo) {}); relay_.RunOneEvent(); upstream_.RunOneEvent(); EXPECT_THAT(relay_published_namespaces, ElementsAre(foobar, foobaz));