Update messages that will go in SUBSCRIBE_NAMESPACE streams up to draft-16. REQUEST_OK, REQUEST_ERROR, SUBSCRIBE_NAMESPACE, NAMESPACE, NAMESPACE_DONE. Session-layer processing of SUBSCRIBE_NAMESPACE does not change in this CL. PiperOrigin-RevId: 864976974
diff --git a/quiche/quic/moqt/moqt_bidi_stream.h b/quiche/quic/moqt/moqt_bidi_stream.h index f58d65c..b1c3351 100644 --- a/quiche/quic/moqt/moqt_bidi_stream.h +++ b/quiche/quic/moqt/moqt_bidi_stream.h
@@ -17,6 +17,7 @@ #include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/types/span.h" +#include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_framer.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" @@ -98,6 +99,13 @@ const MoqtPublishNamespaceDone& message) override { OnParsingError(wrong_message_error_, wrong_message_reason_); } + virtual void OnNamespaceMessage(const MoqtNamespace& message) override { + OnParsingError(wrong_message_error_, wrong_message_reason_); + } + virtual void OnNamespaceDoneMessage( + const MoqtNamespaceDone& message) override { + OnParsingError(wrong_message_error_, wrong_message_reason_); + } virtual void OnPublishNamespaceCancelMessage( const MoqtPublishNamespaceCancel& message) override { OnParsingError(wrong_message_error_, wrong_message_reason_); @@ -182,21 +190,27 @@ } SendMessage(std::move(message), fin); } - void SendRequestOk(uint64_t request_id, - const VersionSpecificParameters& parameters, + void SendRequestOk(uint64_t request_id, const MessageParameters& parameters, bool fin = false) { SendOrBufferMessage( framer_->SerializeRequestOk(MoqtRequestOk{request_id, parameters}), fin); } void SendRequestError(uint64_t request_id, RequestErrorCode error_code, + std::optional<quic::QuicTimeDelta> retry_interval, absl::string_view reason_phrase, bool fin = false) { MoqtRequestError request_error; request_error.request_id = request_id; request_error.error_code = error_code; + request_error.retry_interval = retry_interval; request_error.reason_phrase = reason_phrase; SendOrBufferMessage(framer_->SerializeRequestError(request_error), fin); } + void SendRequestError(uint64_t request_id, MoqtRequestErrorInfo info, + bool fin = false) { + SendRequestError(request_id, info.error_code, info.retry_interval, + info.reason_phrase, fin); + } void Fin() { fin_queued_ = true; if (pending_messages_.empty()) {
diff --git a/quiche/quic/moqt/moqt_bidi_stream_test.cc b/quiche/quic/moqt/moqt_bidi_stream_test.cc index cbc9cdb..1cf8beb 100644 --- a/quiche/quic/moqt/moqt_bidi_stream_test.cc +++ b/quiche/quic/moqt/moqt_bidi_stream_test.cc
@@ -214,8 +214,9 @@ stream_->set_stream(&mock_stream_); EXPECT_CALL(mock_stream_, CanWrite).WillRepeatedly(Return(false)); EXPECT_CALL(mock_stream_, Writev).Times(0); - stream_->SendRequestOk(0, VersionSpecificParameters()); - stream_->SendRequestError(2, RequestErrorCode::kUnauthorized, "bad request"); + stream_->SendRequestOk(0, MessageParameters()); + stream_->SendRequestError(2, RequestErrorCode::kUnauthorized, std::nullopt, + "bad request"); stream_->Fin(); { testing::InSequence seq;
diff --git a/quiche/quic/moqt/moqt_error.h b/quiche/quic/moqt/moqt_error.h index 36f8aac..cb5eba7 100644 --- a/quiche/quic/moqt/moqt_error.h +++ b/quiche/quic/moqt/moqt_error.h
@@ -6,10 +6,12 @@ #define QUICHE_QUIC_MOQT_MOQT_ERROR_H_ #include <cstdint> +#include <optional> #include <string> #include "absl/status/status.h" #include "absl/strings/string_view.h" +#include "quiche/quic/core/quic_time.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/web_transport/web_transport.h" @@ -70,8 +72,9 @@ kExpiredAuthToken = 0x12, }; -struct MoqtErrorPair { +struct MoqtRequestErrorInfo { RequestErrorCode error_code; + std::optional<quic::QuicTimeDelta> retry_interval; std::string reason_phrase; };
diff --git a/quiche/quic/moqt/moqt_fetch_task.h b/quiche/quic/moqt/moqt_fetch_task.h index 45c72d0..bfcf395 100644 --- a/quiche/quic/moqt/moqt_fetch_task.h +++ b/quiche/quic/moqt/moqt_fetch_task.h
@@ -5,11 +5,13 @@ #ifndef QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_ #define QUICHE_QUIC_MOQT_MOQT_FETCH_TASK_H_ +#include <optional> #include <string> #include <utility> #include <variant> #include "absl/status/status.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/common/quiche_callbacks.h" @@ -76,7 +78,7 @@ ObjectsAvailableCallback /*callback*/) override {} void SetFetchResponseCallback(FetchResponseCallback callback) override { MoqtRequestError error{/*request_id=*/0, StatusToRequestErrorCode(status_), - std::string(status_.message())}; + std::nullopt, std::string(status_.message())}; std::move(callback)(error); }
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 81fb0c9..95197a8 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -511,14 +511,9 @@ quiche::QuicheBuffer MoqtFramer::SerializeRequestOk( const MoqtRequestOk& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kRequestOk, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; - return SerializeControlMessage(MoqtMessageType::kRequestOk, - WireVarInt62(message.request_id), - WireKeyValuePairList(parameters)); + return SerializeControlMessage( + MoqtMessageType::kRequestOk, WireVarInt62(message.request_id), + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribe( @@ -548,6 +543,9 @@ return SerializeControlMessage( MoqtMessageType::kRequestError, WireVarInt62(message.request_id), WireVarInt62(message.error_code), + WireVarInt62(message.retry_interval.has_value() + ? message.retry_interval->ToMilliseconds() + 1 + : 0), WireStringWithVarInt62Length(message.reason_phrase)); } @@ -600,6 +598,20 @@ WireTrackNamespace(message.track_namespace)); } +quiche::QuicheBuffer MoqtFramer::SerializeNamespace( + const MoqtNamespace& message) { + return SerializeControlMessage( + MoqtMessageType::kNamespace, + WireTrackNamespace(message.track_namespace_suffix)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeNamespaceDone( + const MoqtNamespaceDone& message) { + return SerializeControlMessage( + MoqtMessageType::kNamespaceDone, + WireTrackNamespace(message.track_namespace_suffix)); +} + quiche::QuicheBuffer MoqtFramer::SerializePublishNamespaceCancel( const MoqtPublishNamespaceCancel& message) { return SerializeControlMessage( @@ -623,16 +635,11 @@ quiche::QuicheBuffer MoqtFramer::SerializeSubscribeNamespace( const MoqtSubscribeNamespace& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kSubscribeNamespace, message.parameters, - parameters)) { - return quiche::QuicheBuffer(); - }; - return SerializeControlMessage(MoqtMessageType::kSubscribeNamespace, - WireVarInt62(message.request_id), - WireTrackNamespace(message.track_namespace), - WireKeyValuePairList(parameters)); + return SerializeControlMessage( + MoqtMessageType::kSubscribeNamespace, WireVarInt62(message.request_id), + WireTrackNamespace(message.track_namespace_prefix), + WireVarInt62(message.subscribe_options), + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializeUnsubscribeNamespace(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index c39bb6a..b5ff0ad 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -61,6 +61,8 @@ const MoqtPublishNamespace& message); quiche::QuicheBuffer SerializePublishNamespaceDone( const MoqtPublishNamespaceDone& message); + quiche::QuicheBuffer SerializeNamespace(const MoqtNamespace& message); + quiche::QuicheBuffer SerializeNamespaceDone(const MoqtNamespaceDone& message); quiche::QuicheBuffer SerializePublishNamespaceCancel( const MoqtPublishNamespaceCancel& message); quiche::QuicheBuffer SerializeTrackStatus(const MoqtTrackStatus& message);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index c1e0ef3..acdc70b 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -44,6 +44,8 @@ MoqtMessageType::kPublishDone, MoqtMessageType::kPublishNamespace, MoqtMessageType::kPublishNamespaceDone, + MoqtMessageType::kNamespace, + MoqtMessageType::kNamespaceDone, MoqtMessageType::kPublishNamespaceCancel, MoqtMessageType::kTrackStatus, MoqtMessageType::kGoAway, @@ -134,7 +136,6 @@ auto data = std::get<MoqtSubscribeOk>(structured_data); return framer_.SerializeSubscribeOk(data); } - case MoqtMessageType::kUnsubscribe: { auto data = std::get<MoqtUnsubscribe>(structured_data); return framer_.SerializeUnsubscribe(data); @@ -151,6 +152,14 @@ auto data = std::get<MoqtPublishNamespaceDone>(structured_data); return framer_.SerializePublishNamespaceDone(data); } + case MoqtMessageType::kNamespace: { + auto data = std::get<MoqtNamespace>(structured_data); + return framer_.SerializeNamespace(data); + } + case MoqtMessageType::kNamespaceDone: { + auto data = std::get<MoqtNamespaceDone>(structured_data); + return framer_.SerializeNamespaceDone(data); + } case moqt::MoqtMessageType::kPublishNamespaceCancel: { auto data = std::get<MoqtPublishNamespaceCancel>(structured_data); return framer_.SerializePublishNamespaceCancel(data);
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 25c61b2..5fc8956 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -97,10 +97,11 @@ MockSubscribeRemoteTrackVisitor* visitor) { bool received_ok = false; EXPECT_CALL(*visitor, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters(MoqtFilterType::kLargestObject); client_->session()->Subscribe(track_name, visitor, parameters); @@ -116,7 +117,7 @@ MockSessionCallbacks server_callbacks_; MockSubscribeRemoteTrackVisitor subscribe_visitor_; testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> outgoing_publish_namespace_callback_; std::unique_ptr<MoqtClientEndpoint> client_; std::unique_ptr<MoqtServerEndpoint> server_; @@ -181,7 +182,7 @@ std::move(callback)(std::nullopt); }); testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_callback; client_->session()->PublishNamespace( TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), @@ -189,7 +190,7 @@ bool matches = false; EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); @@ -224,7 +225,7 @@ std::move(callback)(std::nullopt); }); testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_callback; client_->session()->PublishNamespace( TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), @@ -232,7 +233,7 @@ bool matches = false; EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); @@ -243,7 +244,7 @@ matches = false; EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); @@ -274,7 +275,7 @@ bool matches = false; EXPECT_CALL(outgoing_publish_namespace_callback_, Call) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); FullTrackName track_name(track_namespace, "/catalog"); EXPECT_FALSE(error.has_value()); @@ -283,7 +284,7 @@ parameters); }) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_TRUE(error.has_value()); }); @@ -331,7 +332,7 @@ }); client_->session()->PublishNamespace( TrackNamespace{"test"}, - [](TrackNamespace, std::optional<MoqtErrorPair>) {}, *parameters); + [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, *parameters); bool success = test_harness_.RunUntilWithDefaultTimeout( [&]() { return received_subscribe_ok; }); EXPECT_TRUE(success); @@ -388,12 +389,13 @@ &subscribe_visitor_, parameters); std::optional<Location> largest_id; EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); - largest_id = - std::get<SubscribeOkData>(response).parameters.largest_object; - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); + largest_id = + std::get<SubscribeOkData>(response).parameters.largest_object; + }); bool success = test_harness_.RunUntilWithDefaultTimeout([&]() { return largest_id.has_value() && *largest_id == Location(0, 2); }); @@ -530,7 +532,7 @@ TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) { EstablishSession(); testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_callback; client_->session()->PublishNamespace( TrackNamespace{"foo"}, publish_namespace_callback.AsStdFunction(), @@ -538,7 +540,7 @@ bool matches = false; EXPECT_CALL(publish_namespace_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { matches = true; EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); @@ -566,10 +568,11 @@ listener->OnSubscribeAccepted(); }); EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters; client_->session()->Subscribe(full_track_name, &subscribe_visitor_, parameters); @@ -596,10 +599,11 @@ listener->OnSubscribeAccepted(); }); EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters(MoqtFilterType::kLargestObject); client_->session()->Subscribe(full_track_name, &subscribe_visitor_, parameters); @@ -626,10 +630,11 @@ listener->OnSubscribeAccepted(); }); EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters(MoqtFilterType::kNextGroupStart); client_->session()->Subscribe(full_track_name, &subscribe_visitor_, parameters); @@ -643,10 +648,12 @@ FullTrackName full_track_name("foo", "bar"); bool received_ok = false; EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<MoqtErrorPair>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = + std::holds_alternative<MoqtRequestErrorInfo>(response); + }); MessageParameters parameters(MoqtFilterType::kLargestObject); client_->session()->Subscribe(full_track_name, &subscribe_visitor_, parameters); @@ -695,10 +702,11 @@ EXPECT_TRUE(client_->session()->Subscribe(full_track_name, &subscribe_visitor_, parameters)); EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_TRUE(std::holds_alternative<MoqtErrorPair>(response)); - }); // Teardown + .WillOnce( + [](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_TRUE(std::holds_alternative<MoqtRequestErrorInfo>(response)); + }); // Teardown } TEST_F(MoqtIntegrationTest, ObjectAcks) { @@ -727,7 +735,7 @@ }); EXPECT_CALL(subscribe_visitor_, OnReply) .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair>) { + std::variant<SubscribeOkData, MoqtRequestErrorInfo>) { ack_function(10, 20, quic::QuicTimeDelta::FromMicroseconds(-123)); ack_function(100, 200, quic::QuicTimeDelta::FromMicroseconds(456)); }); @@ -775,10 +783,11 @@ bool received_ok = false; EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters(MoqtFilterType::kLargestObject); // Set delivery timeout to ~ 1 RTT: any loss is fatal. parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100); @@ -823,10 +832,11 @@ bool received_ok = false; EXPECT_CALL(subscribe_visitor_, OnReply) - .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - received_ok = std::holds_alternative<SubscribeOkData>(response); - }); + .WillOnce( + [&](const FullTrackName&, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + received_ok = std::holds_alternative<SubscribeOkData>(response); + }); MessageParameters parameters(MoqtFilterType::kLargestObject); // Set delivery timeout to ~ 1 RTT: any loss is fatal. parameters.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(100); @@ -906,7 +916,7 @@ bool subscribed = false; EXPECT_CALL(subscribe_visitor_, OnReply) .WillOnce([&](const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair>) { + std::variant<SubscribeOkData, MoqtRequestErrorInfo>) { subscribed = true; }); bool success =
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index 9f6ac2d..a64ff21 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -121,6 +121,10 @@ return "TRACK_STATUS"; case MoqtMessageType::kPublishNamespace: return "PUBLISH_NAMESPACE"; + case MoqtMessageType::kNamespace: + return "NAMESPACE"; + case MoqtMessageType::kNamespaceDone: + return "NAMESPACE_DONE"; case MoqtMessageType::kRequestOk: return "REQUEST_OK"; case MoqtMessageType::kPublishNamespaceDone:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 3695543..3f006d5 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -249,11 +249,13 @@ kRequestError = 0x05, kPublishNamespace = 0x06, kRequestOk = 0x07, + kNamespace = 0x08, kPublishNamespaceDone = 0x09, kUnsubscribe = 0x0a, kPublishDone = 0x0b, kPublishNamespaceCancel = 0x0c, kTrackStatus = 0x0d, + kNamespaceDone = 0x0e, kGoAway = 0x10, kSubscribeNamespace = 0x11, kUnsubscribeNamespace = 0x14, @@ -346,6 +348,7 @@ struct QUICHE_EXPORT MoqtRequestError { uint64_t request_id; RequestErrorCode error_code; + std::optional<quic::QuicTimeDelta> retry_interval; std::string reason_phrase; }; @@ -407,7 +410,7 @@ struct QUICHE_EXPORT MoqtRequestOk { uint64_t request_id; - VersionSpecificParameters parameters; + MessageParameters parameters; }; struct QUICHE_EXPORT MoqtPublishNamespaceDone { @@ -429,16 +432,33 @@ std::string new_session_uri; }; +enum class QUICHE_EXPORT SubscribeNamespaceOption : uint64_t { + kPublish = 0x00, + kNamespace = 0x01, + kBoth = 0x02, +}; +static constexpr uint64_t kMaxSubscribeOption = 0x02; + struct QUICHE_EXPORT MoqtSubscribeNamespace { uint64_t request_id; - TrackNamespace track_namespace; - VersionSpecificParameters parameters; + TrackNamespace track_namespace_prefix; + SubscribeNamespaceOption subscribe_options; + MessageParameters parameters; }; +// TODO(martinduke): Delete this struct QUICHE_EXPORT MoqtUnsubscribeNamespace { TrackNamespace track_namespace; }; +struct QUICHE_EXPORT MoqtNamespace { + TrackNamespace track_namespace_suffix; +}; + +struct QUICHE_EXPORT MoqtNamespaceDone { + TrackNamespace track_namespace_suffix; +}; + struct QUICHE_EXPORT MoqtMaxRequestId { uint64_t max_request_id; };
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index a56e1db..590779f 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -124,13 +124,13 @@ void SetFetchResponseCallback(FetchResponseCallback callback) override { if (!status_.ok()) { MoqtRequestError error(0, StatusToRequestErrorCode(status_), - std::string(status_.message())); + std::nullopt, std::string(status_.message())); std::move(callback)(error); return; } if (objects_.empty()) { MoqtRequestError error(0, StatusToRequestErrorCode(status_), - "No objects in range"); + std::nullopt, "No objects in range"); std::move(callback)(error); return; }
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc index 4b6cc73..333961c 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc +++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -97,7 +97,8 @@ ()); MOCK_METHOD(void, OnTrackPublisherGone, (), (override)); MOCK_METHOD(void, OnSubscribeAccepted, (), (override)); - MOCK_METHOD(void, OnSubscribeRejected, (MoqtErrorPair reason), (override)); + MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo reason), + (override)); }; absl::StatusOr<std::vector<std::string>> FetchToVector(
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index a67b201..9891815 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -599,6 +599,12 @@ case MoqtMessageType::kPublishNamespaceDone: bytes_read = ProcessPublishNamespaceDone(reader); break; + case MoqtMessageType::kNamespace: + bytes_read = ProcessNamespace(reader); + break; + case MoqtMessageType::kNamespaceDone: + bytes_read = ProcessNamespaceDone(reader); + break; case MoqtMessageType::kPublishNamespaceCancel: bytes_read = ProcessPublishNamespaceCancel(reader); break; @@ -726,12 +732,19 @@ size_t MoqtControlParser::ProcessRequestError(quic::QuicDataReader& reader) { MoqtRequestError request_error; uint64_t error_code; + uint64_t raw_interval; if (!reader.ReadVarInt62(&request_error.request_id) || !reader.ReadVarInt62(&error_code) || + !reader.ReadVarInt62(&raw_interval) || !reader.ReadStringVarInt62(request_error.reason_phrase)) { return 0; } request_error.error_code = static_cast<RequestErrorCode>(error_code); + request_error.retry_interval = + (raw_interval == 0) + ? std::nullopt + : std::make_optional( + quic::QuicTimeDelta::FromMilliseconds(raw_interval - 1)); visitor_.OnRequestErrorMessage(request_error); return reader.PreviouslyReadPayload().length(); } @@ -816,17 +829,30 @@ return reader.PreviouslyReadPayload().length(); } +size_t MoqtControlParser::ProcessNamespace(quic::QuicDataReader& reader) { + MoqtNamespace _namespace; + if (!ReadTrackNamespace(reader, _namespace.track_namespace_suffix)) { + return 0; + } + visitor_.OnNamespaceMessage(_namespace); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessNamespaceDone(quic::QuicDataReader& reader) { + MoqtNamespaceDone namespace_done; + if (!ReadTrackNamespace(reader, namespace_done.track_namespace_suffix)) { + return 0; + } + visitor_.OnNamespaceDoneMessage(namespace_done); + return reader.PreviouslyReadPayload().length(); +} + size_t MoqtControlParser::ProcessRequestOk(quic::QuicDataReader& reader) { MoqtRequestOk request_ok; if (!reader.ReadVarInt62(&request_ok.request_id)) { return 0; } - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { - return 0; - } - if (!FillAndValidateVersionSpecificParameters( - parameters, request_ok.parameters, MoqtMessageType::kRequestOk)) { + if (!FillAndValidateMessageParameters(reader, request_ok.parameters)) { return 0; } visitor_.OnRequestOkMessage(request_ok); @@ -876,17 +902,20 @@ size_t MoqtControlParser::ProcessSubscribeNamespace( quic::QuicDataReader& reader) { MoqtSubscribeNamespace subscribe_namespace; + uint64_t raw_option; if (!reader.ReadVarInt62(&subscribe_namespace.request_id) || - !ReadTrackNamespace(reader, subscribe_namespace.track_namespace)) { + !ReadTrackNamespace(reader, subscribe_namespace.track_namespace_prefix) || + !reader.ReadVarInt62(&raw_option)) { return 0; } - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { + if (raw_option > kMaxSubscribeOption) { + ParseError("Invalid SUBSCRIBE_NAMESPACE option"); return 0; } - if (!FillAndValidateVersionSpecificParameters( - parameters, subscribe_namespace.parameters, - MoqtMessageType::kSubscribeNamespace)) { + subscribe_namespace.subscribe_options = + static_cast<SubscribeNamespaceOption>(raw_option); + if (!FillAndValidateMessageParameters(reader, + subscribe_namespace.parameters)) { return 0; } visitor_.OnSubscribeNamespaceMessage(subscribe_namespace); @@ -1405,6 +1434,7 @@ next_input_ = kGroupId; break; case kGroupId: + QUICHE_CHECK(type_.has_value()); if (type_->IsFetch() || type_->IsSubgroupPresent()) { next_input_ = kSubgroupId; break; @@ -1416,14 +1446,17 @@ type_->HasDefaultPriority() ? kObjectId : kPublisherPriority; break; case kSubgroupId: + QUICHE_CHECK(type_.has_value()); next_input_ = (type_->IsFetch() || type_->HasDefaultPriority()) ? kObjectId : kPublisherPriority; break; case kPublisherPriority: + QUICHE_CHECK(type_.has_value()); next_input_ = type_->IsFetch() ? kExtensionSize : kObjectId; break; case kObjectId: + QUICHE_CHECK(type_.has_value()); if (type_->HasDefaultPriority()) { metadata_.publisher_priority = default_publisher_priority_; }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index e84eccc..34f6d41 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -49,6 +49,8 @@ const MoqtPublishNamespace& message) = 0; virtual void OnPublishNamespaceDoneMessage( const MoqtPublishNamespaceDone& message) = 0; + virtual void OnNamespaceMessage(const MoqtNamespace& message) = 0; + virtual void OnNamespaceDoneMessage(const MoqtNamespaceDone& message) = 0; virtual void OnPublishNamespaceCancelMessage( const MoqtPublishNamespaceCancel& message) = 0; virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0; @@ -139,6 +141,8 @@ size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader); size_t ProcessPublishNamespace(quic::QuicDataReader& reader); size_t ProcessPublishNamespaceDone(quic::QuicDataReader& reader); + size_t ProcessNamespace(quic::QuicDataReader& reader); + size_t ProcessNamespaceDone(quic::QuicDataReader& reader); size_t ProcessPublishNamespaceCancel(quic::QuicDataReader& reader); size_t ProcessTrackStatus(quic::QuicDataReader& reader); size_t ProcessGoAway(quic::QuicDataReader& reader);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index f864cd9..3a257ac 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -44,6 +44,8 @@ MoqtMessageType::kTrackStatus, MoqtMessageType::kPublishNamespace, MoqtMessageType::kPublishNamespaceDone, + MoqtMessageType::kNamespace, + MoqtMessageType::kNamespaceDone, MoqtMessageType::kPublishNamespaceCancel, MoqtMessageType::kClientSetup, MoqtMessageType::kServerSetup, @@ -1624,6 +1626,25 @@ EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); } +TEST_F(MoqtMessageSpecificTest, InvalidSubscribeNamespaceOption) { + webtransport::test::InMemoryStream stream(/*stream_id=*/0); + MoqtControlParser parser(kRawQuic, &stream, visitor_); + char subscribe_namespace[] = { + 0x11, 0x00, 0x11, 0x01, // request_id = 1 + 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo" + 0x03, // subscribe_options invalid + 0x02, // 2 parameters + 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" + 0x0d, 0x01, // forward = true + }; + stream.Receive( + absl::string_view(subscribe_namespace, sizeof(subscribe_namespace)), + false); + parser.ReadAndDispatchMessages(); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); +} + class MoqtDataParserStateMachineTest : public quic::test::QuicTest { protected: MoqtDataParserStateMachineTest()
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 68978e4..fe65ad1 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -11,6 +11,7 @@ #include "absl/base/nullability.h" #include "quiche/quic/core/quic_time.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_fetch_task.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_messages.h" @@ -33,7 +34,7 @@ virtual void OnSubscribeAccepted() = 0; // Called when the publisher is sure that it cannot serve the subscription. // This could happen synchronously or asynchronously. - virtual void OnSubscribeRejected(MoqtErrorPair reason) = 0; + virtual void OnSubscribeRejected(MoqtRequestErrorInfo info) = 0; // Notifies that a new object is available on the track. The object payload // itself may be retrieved via GetCachedObject method of the associated track
diff --git a/quiche/quic/moqt/moqt_relay_publisher_test.cc b/quiche/quic/moqt/moqt_relay_publisher_test.cc index b137730..6cb6b94 100644 --- a/quiche/quic/moqt/moqt_relay_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_publisher_test.cc
@@ -77,10 +77,10 @@ TEST_F(MoqtRelayPublisherTest, PublishNamespaceLifecycle) { EXPECT_EQ(publisher_.GetTrack(FullTrackName("foo", "bar")), nullptr); - std::optional<MoqtErrorPair> response; + std::optional<MoqtRequestErrorInfo> response; publisher_.OnPublishNamespace( TrackNamespace({"foo"}), VersionSpecificParameters(), &session_, - [&](std::optional<MoqtErrorPair> error_response) { + [&](std::optional<MoqtRequestErrorInfo> error_response) { response = error_response; }); EXPECT_EQ(response, std::nullopt);
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc index 7c67c42..05df4a7 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -30,12 +30,12 @@ void MoqtRelayTrackPublisher::OnReply( const FullTrackName&, - std::variant<SubscribeOkData, MoqtErrorPair> response) { + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { if (is_closing_) { return; } - if (std::holds_alternative<MoqtErrorPair>(response)) { - auto request_error = std::get<MoqtErrorPair>(response); + if (std::holds_alternative<MoqtRequestErrorInfo>(response)) { + auto request_error = std::get<MoqtRequestErrorInfo>(response); // Delete upstream_ to avoid sending UNSUBSCRIBE. upstream_ = quiche::QuicheWeakPtr<MoqtSessionInterface>(); // Sessions will delete listeners, causing the track to delete itself. @@ -303,8 +303,8 @@ MoqtSessionInterface* session = upstream_.GetIfAvailable(); if (session == nullptr) { // upstream went away, reject the subscribe. - listener->OnSubscribeRejected(MoqtErrorPair{ - RequestErrorCode::kInternalError, + listener->OnSubscribeRejected(MoqtRequestErrorInfo{ + RequestErrorCode::kInternalError, std::nullopt, "The upstream session was closed before a subscription could be " "established."}); DeleteTrack();
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h index 21159ea..c6158fd 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher.h +++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -66,8 +66,9 @@ MoqtRelayTrackPublisher& operator=(MoqtRelayTrackPublisher&&) = default; // SubscribeVisitor implementation. - void OnReply(const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) override; + void OnReply( + const FullTrackName& full_track_name, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) override; // TODO(vasilvv): Implement this if we want to support Object Acks across // relays. void OnCanAckObjects(MoqtObjectAckFunction /*ack_function*/) override {}
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc index 6e7107a..37075b0 100644 --- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc +++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -31,7 +31,8 @@ class MockMoqtObjectListener : public MoqtObjectListener { public: MOCK_METHOD(void, OnSubscribeAccepted, (), (override)); - MOCK_METHOD(void, OnSubscribeRejected, (MoqtErrorPair reason), (override)); + MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo reason), + (override)); MOCK_METHOD(void, OnNewObjectAvailable, (Location sequence, uint64_t subgroup, MoqtPriority publisher_priority, @@ -306,8 +307,9 @@ EXPECT_CALL(listener_, OnSubscribeRejected).WillOnce([this] { publisher_.RemoveObjectListener(&listener_); }); - publisher_.OnReply(kTrackName, MoqtErrorPair{RequestErrorCode::kUnauthorized, - "Unauthorized"}); + publisher_.OnReply(kTrackName, + MoqtRequestErrorInfo{RequestErrorCode::kUnauthorized, + std::nullopt, "Unauthorized"}); EXPECT_TRUE(track_deleted_); }
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 10798cb..2e90a54 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -260,7 +260,7 @@ bool MoqtSession::SubscribeNamespace( TrackNamespace track_namespace, MoqtOutgoingSubscribeNamespaceCallback callback, - VersionSpecificParameters parameters) { + MessageParameters parameters) { QUICHE_DCHECK(track_namespace.IsValid()); if (received_goaway_ || sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT @@ -283,18 +283,22 @@ } if (outgoing_subscribe_namespaces_.contains(track_namespace)) { std::move(callback)( - track_namespace, RequestErrorCode::kInternalError, - "SUBSCRIBE_NAMESPACE already outstanding for namespace"); + track_namespace, + MoqtRequestErrorInfo{ + RequestErrorCode::kInternalError, std::nullopt, + "SUBSCRIBE_NAMESPACE already outstanding for namespace"}); return false; } MoqtSubscribeNamespace message; message.request_id = next_request_id_; next_request_id_ += 2; - message.track_namespace = track_namespace; + message.track_namespace_prefix = track_namespace; + // We don't support PUBLISH, so don't ask for it. + message.subscribe_options = SubscribeNamespaceOption::kNamespace; message.parameters = parameters; SendControlMessage(framer_.SerializeSubscribeNamespace(message)); QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_NAMESPACE message for " - << message.track_namespace; + << message.track_namespace_prefix; pending_outgoing_subscribe_namespaces_[message.request_id] = PendingSubscribeNamespaceData{track_namespace, std::move(callback)}; outgoing_subscribe_namespaces_.emplace(track_namespace); @@ -326,8 +330,9 @@ if (outgoing_publish_namespaces_.contains(track_namespace)) { std::move(callback)( track_namespace, - MoqtErrorPair{RequestErrorCode::kInternalError, - "PUBLISH_NAMESPACE already outstanding for namespace"}); + MoqtRequestErrorInfo{ + RequestErrorCode::kInternalError, std::nullopt, + "PUBLISH_NAMESPACE already outstanding for namespace"}); return; } if (next_request_id_ >= peer_max_request_id_) { @@ -725,7 +730,8 @@ if (subscribe->ErrorIsAllowed()) { subscribe->visitor()->OnReply( subscribe->full_track_name(), - MoqtErrorPair{RequestErrorCode::kNotSupported, "Subscription closed"}); + MoqtRequestErrorInfo{RequestErrorCode::kNotSupported, std::nullopt, + "Subscription closed"}); } else { subscribe->visitor()->OnPublishDone(subscribe->full_track_name()); } @@ -990,7 +996,7 @@ if (session_->sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY"; SendRequestError(message.request_id, RequestErrorCode::kUnauthorized, - "SUBSCRIBE after GOAWAY"); + std::nullopt, "SUBSCRIBE after GOAWAY"); return; } if (session_->subscribed_track_names_.contains(message.full_track_name)) { @@ -1005,7 +1011,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name << " rejected by the application: does not exist"; SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist, - "not found"); + std::nullopt, "not found"); return; } @@ -1115,7 +1121,7 @@ session_->pending_outgoing_subscribe_namespaces_.find(message.request_id); if (sn_it != session_->pending_outgoing_subscribe_namespaces_.end()) { std::move(sn_it->second.callback)(sn_it->second.track_namespace, - std::nullopt, ""); + std::nullopt); session_->pending_outgoing_subscribe_namespaces_.erase(sn_it); return; } @@ -1123,10 +1129,14 @@ // TRACK_STATUS. // If it doesn't match any state, it might be because the local application // cancelled the request. Do nothing. + // TODO(martinduke): Do something with parameters. } void MoqtSession::ControlStream::OnRequestErrorMessage( const MoqtRequestError& message) { + MoqtRequestErrorInfo error_info{message.error_code, message.retry_interval, + message.reason_phrase}; + // TODO(martinduke): Do something with retry_interval. RemoteTrack* track = session_->RemoteTrackById(message.request_id); if (track != nullptr) { // It's in response to SUBSCRIBE or FETCH. @@ -1153,9 +1163,7 @@ // this subscribe will be deleted after calling Subscribe(). session_->subscribe_by_name_.erase(subscribe->full_track_name()); if (subscribe->visitor() != nullptr) { - subscribe->visitor()->OnReply( - subscribe->full_track_name(), - MoqtErrorPair{message.error_code, message.reason_phrase}); + subscribe->visitor()->OnReply(subscribe->full_track_name(), error_info); } } if (!session_->is_closing_) { @@ -1173,9 +1181,7 @@ if (it2 == session_->outgoing_publish_namespaces_.end()) { return; // State might have been destroyed due to PUBLISH_NAMESPACE_DONE. } - std::move(it2->second)( - track_namespace, - MoqtErrorPair{message.error_code, std::string(message.reason_phrase)}); + std::move(it2->second)(track_namespace, error_info); session_->pending_outgoing_publish_namespaces_.erase(pn_it); session_->outgoing_publish_namespaces_.erase(it2); return; @@ -1185,8 +1191,7 @@ session_->pending_outgoing_subscribe_namespaces_.find(message.request_id); if (sn_it != session_->pending_outgoing_subscribe_namespaces_.end()) { std::move(sn_it->second.callback)(sn_it->second.track_namespace, - message.error_code, - absl::string_view(message.reason_phrase)); + error_info); session_->outgoing_subscribe_namespaces_.erase( sn_it->second.track_namespace); session_->pending_outgoing_subscribe_namespaces_.erase(sn_it); @@ -1245,7 +1250,7 @@ if (session_->sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Received a PUBLISH_NAMESPACE after GOAWAY"; SendRequestError(message.request_id, RequestErrorCode::kUnauthorized, - "PUBLISH_NAMESPACE after GOAWAY"); + std::nullopt, "PUBLISH_NAMESPACE after GOAWAY"); return; } QUIC_DLOG(INFO) << ENDPOINT << "Received a PUBLISH_NAMESPACE for " @@ -1254,17 +1259,16 @@ session_->GetWeakPtr(); session_->callbacks_.incoming_publish_namespace_callback( message.track_namespace, message.parameters, - [&](std::optional<MoqtErrorPair> error) { + [&](std::optional<MoqtRequestErrorInfo> error) { MoqtSession* session = static_cast<MoqtSession*>(session_weakptr.GetIfAvailable()); if (session == nullptr) { return; } if (error.has_value()) { - SendRequestError(message.request_id, error->error_code, - error->reason_phrase); + SendRequestError(message.request_id, *error); } else { - SendRequestOk(message.request_id, VersionSpecificParameters()); + SendRequestOk(message.request_id, MessageParameters()); session->incoming_publish_namespaces_.insert(message.track_namespace); } }); @@ -1290,7 +1294,8 @@ } std::move(it->second)( message.track_namespace, - MoqtErrorPair{message.error_code, std::string(message.error_reason)}); + MoqtRequestErrorInfo{message.error_code, std::nullopt, + std::string(message.error_reason)}); session_->outgoing_publish_namespaces_.erase(it); } @@ -1303,7 +1308,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "Received a TRACK_STATUS_REQUEST after GOAWAY"; SendRequestError(message.request_id, RequestErrorCode::kUnauthorized, - "TRACK_STATUS_REQUEST after GOAWAY"); + std::nullopt, "TRACK_STATUS_REQUEST after GOAWAY"); return; } // TODO(martinduke): Handle authentication. @@ -1311,7 +1316,7 @@ session_->publisher_->GetTrack(message.full_track_name); if (track == nullptr) { SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist, - "Track does not exist"); + std::nullopt, "Track does not exist"); return; } auto [it, inserted] = session_->incoming_track_status_.emplace( @@ -1349,29 +1354,28 @@ QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_NAMESPACE after GOAWAY"; SendRequestError(message.request_id, RequestErrorCode::kUnauthorized, - "SUBSCRIBE_NAMESPACE after GOAWAY"); + std::nullopt, "SUBSCRIBE_NAMESPACE after GOAWAY"); return; } if (!session_->incoming_subscribe_namespace_.SubscribeNamespace( - message.track_namespace)) { + message.track_namespace_prefix)) { QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_NAMESPACE for " - << message.track_namespace + << message.track_namespace_prefix << " that is already subscribed to"; SendRequestError(message.request_id, - RequestErrorCode::kNamespacePrefixOverlap, + RequestErrorCode::kNamespacePrefixOverlap, std::nullopt, "SUBSCRIBE_NAMESPACE for similar subscribed namespace"); return; } (session_->callbacks_.incoming_subscribe_namespace_callback)( - message.track_namespace, message.parameters, - [&](std::optional<MoqtErrorPair> error) { + message.track_namespace_prefix, message.parameters, + [&](std::optional<MoqtRequestErrorInfo> error) { if (error.has_value()) { - SendRequestError(message.request_id, error->error_code, - error->reason_phrase); + SendRequestError(message.request_id, *error); session_->incoming_subscribe_namespace_.UnsubscribeNamespace( - message.track_namespace); + message.track_namespace_prefix); } else { - SendRequestOk(message.request_id, VersionSpecificParameters()); + SendRequestOk(message.request_id, MessageParameters()); } }); } @@ -1405,7 +1409,7 @@ if (session_->sent_goaway_) { QUIC_DLOG(INFO) << ENDPOINT << "Received a FETCH after GOAWAY"; SendRequestError(message.request_id, RequestErrorCode::kUnauthorized, - "FETCH after GOAWAY"); + std::nullopt, "FETCH after GOAWAY"); return; } std::unique_ptr<MoqtFetchTask> fetch; @@ -1420,7 +1424,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " rejected by the application: not found"; SendRequestError(message.request_id, RequestErrorCode::kTrackDoesNotExist, - "not found"); + std::nullopt, "not found"); } QUIC_DLOG(INFO) << ENDPOINT << "Received a StandaloneFETCH for " << track_name; @@ -1442,7 +1446,7 @@ << "request_id " << joining_request_id << " that does not exist"; SendRequestError(message.request_id, - RequestErrorCode::kInvalidJoiningRequestId, + RequestErrorCode::kInvalidJoiningRequestId, std::nullopt, "Joining Fetch for non-existent request"); return; } @@ -1475,7 +1479,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " could not initialize the task"; SendRequestError(message.request_id, RequestErrorCode::kInvalidRange, - fetch->GetStatus().message()); + std::nullopt, fetch->GetStatus().message()); return; } auto published_fetch = std::make_unique<PublishedFetch>( @@ -1486,7 +1490,7 @@ QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name << " could not be added to the session"; SendRequestError(message.request_id, RequestErrorCode::kInternalError, - "Could not initialize FETCH state"); + std::nullopt, "Could not initialize FETCH state"); } MoqtFetchTask* fetch_task = result.first->second->fetch_task(); fetch_task->SetFetchResponseCallback( @@ -1503,6 +1507,7 @@ } SendRequestError(request_id, std::get<MoqtRequestError>(message).error_code, + std::get<MoqtRequestError>(message).retry_interval, std::get<MoqtRequestError>(message).reason_phrase); }); // Set a temporary new-object callback that creates a data stream. When @@ -1564,7 +1569,7 @@ absl::string_view error_reason = session_->sent_goaway_ ? "Received a PUBLISH after GOAWAY" : "PUBLISH is not supported"; - SendRequestError(message.request_id, error_code, error_reason); + SendRequestError(message.request_id, error_code, std::nullopt, error_reason); } void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message, @@ -1757,6 +1762,8 @@ return; } } + QUICHE_CHECK(parser_.stream_type().has_value()); + QUICHE_CHECK(parser_.track_alias().has_value()); if (parser_.stream_type()->IsSubgroup()) { if (!knew_track_alias) { // This is a new stream for a subscribe. Notify the subscription. @@ -1915,9 +1922,8 @@ } void MoqtSession::PublishedSubscription::OnSubscribeRejected( - MoqtErrorPair reason) { - session_->GetControlStream()->SendRequestError(request_id_, reason.error_code, - reason.reason_phrase); + MoqtRequestErrorInfo info) { + session_->GetControlStream()->SendRequestError(request_id_, info); session_->published_subscriptions_.erase(request_id_); // No class access below this line! } @@ -2419,7 +2425,8 @@ } for (auto& [track_namespace, callback] : outgoing_publish_namespaces_) { callback(track_namespace, - MoqtErrorPair{RequestErrorCode::kUninterested, "Session closed"}); + MoqtRequestErrorInfo{RequestErrorCode::kUninterested, std::nullopt, + "Session closed"}); } while (!upstream_by_id_.empty()) { auto upstream = upstream_by_id_.begin();
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 867fae5..22ef266 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -101,7 +101,7 @@ // Returns true if message was sent. bool SubscribeNamespace(TrackNamespace track_namespace, MoqtOutgoingSubscribeNamespaceCallback callback, - VersionSpecificParameters parameters); + MessageParameters parameters); bool UnsubscribeNamespace(TrackNamespace track_namespace); // Allows the subscriber to declare it will not subscribe to |track_namespace| @@ -383,7 +383,7 @@ // MoqtObjectListener implementation. void OnSubscribeAccepted() override; - void OnSubscribeRejected(MoqtErrorPair reason) override; + void OnSubscribeRejected(MoqtRequestErrorInfo info) override; // This is only called for objects that have just arrived. void OnNewObjectAvailable( Location location, uint64_t subgroup, MoqtPriority publisher_priority, @@ -653,31 +653,18 @@ QUICHE_NOTREACHED(); return; } - VersionSpecificParameters parameters; - // TODO(martinduke): Turn these into parameters. -#if 0 - QUICHE_BUG_IF(quic_bug_track_status_ok_no_expiration, - !publisher_->expiration().has_value()) - << "Request accepted without expiration"; - track_status_ok.expires = - publisher_->expiration().value_or(quic::QuicTimeDelta::Zero()); - QUICHE_BUG_IF(quic_bug_track_status_ok_no_delivery_order, - !publisher_->delivery_order().has_value()) - << "Request accepted without delivery order"; - track_status_ok.group_order = - publisher_->delivery_order().value_or(MoqtDeliveryOrder::kAscending); - track_status_ok.largest_location = publisher_->largest_location(); - session_->SendControlMessage( - session_->framer_.SerializeTrackStatusOk(track_status_ok)); -#endif + MessageParameters parameters; + parameters.expires = publisher_->expiration(); + parameters.largest_object = publisher_->largest_location(); session_->GetControlStream()->SendRequestOk(request_id_, parameters); session_->incoming_track_status_.erase(request_id_); // No class access below this line! } - void OnSubscribeRejected(MoqtErrorPair error_reason) override { + void OnSubscribeRejected(MoqtRequestErrorInfo info) override { session_->GetControlStream()->SendRequestError( - request_id_, error_reason.error_code, error_reason.reason_phrase); + request_id_, info.error_code, info.retry_interval, + info.reason_phrase); session_->incoming_track_status_.erase(request_id_); // No class access below this line! } @@ -692,8 +679,9 @@ void OnGroupAbandoned(uint64_t /*group_id*/) override {} void OnTrackPublisherGone() override { publisher_ = nullptr; - OnSubscribeRejected(MoqtErrorPair(RequestErrorCode::kTrackDoesNotExist, - "Track publisher gone")); + OnSubscribeRejected( + MoqtRequestErrorInfo(RequestErrorCode::kTrackDoesNotExist, + std::nullopt, "Track publisher gone")); } private:
diff --git a/quiche/quic/moqt/moqt_session_callbacks.h b/quiche/quic/moqt/moqt_session_callbacks.h index 541308a..7e1c610 100644 --- a/quiche/quic/moqt/moqt_session_callbacks.h +++ b/quiche/quic/moqt/moqt_session_callbacks.h
@@ -11,7 +11,9 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/quic_clock.h" #include "quiche/quic/core/quic_default_clock.h" -#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" +#include "quiche/quic/moqt/moqt_names.h" #include "quiche/common/quiche_callbacks.h" namespace moqt { @@ -20,7 +22,7 @@ // once; if the argument is nullopt, an OK response was received. Otherwise, an // ERROR response was received. using MoqtResponseCallback = - quiche::SingleUseCallback<void(std::optional<MoqtErrorPair>)>; + quiche::SingleUseCallback<void(std::optional<MoqtRequestErrorInfo>)>; // Called when the SETUP message from the peer is received. using MoqtSessionEstablishedCallback = quiche::SingleUseCallback<void()>; @@ -48,10 +50,10 @@ // the peer. SUBSCRIBE_NAMESPACE sets a value for |parameters|, // UNSUBSCRIBE_NAMESPACE does not. For UNSUBSCRIBE_NAMESPACE, |callback| is // null. -using MoqtIncomingSubscribeNamespaceCallback = quiche::MultiUseCallback<void( - const TrackNamespace& track_namespace, - std::optional<VersionSpecificParameters> parameters, - MoqtResponseCallback callback)>; +using MoqtIncomingSubscribeNamespaceCallback = + quiche::MultiUseCallback<void(const TrackNamespace& track_namespace, + std::optional<MessageParameters> parameters, + MoqtResponseCallback callback)>; inline void DefaultIncomingPublishNamespaceCallback( const TrackNamespace&, const std::optional<VersionSpecificParameters>&, @@ -59,14 +61,14 @@ if (callback == nullptr) { return; } - return std::move(callback)(MoqtErrorPair{ - RequestErrorCode::kNotSupported, + return std::move(callback)(MoqtRequestErrorInfo{ + RequestErrorCode::kNotSupported, std::nullopt, "This endpoint does not support incoming SUBSCRIBE_NAMESPACE messages"}); }; inline void DefaultIncomingSubscribeNamespaceCallback( - const TrackNamespace& track_namespace, - std::optional<VersionSpecificParameters>, MoqtResponseCallback callback) { + const TrackNamespace& track_namespace, std::optional<MessageParameters>, + MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index 0f8d184..34006de 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -42,7 +42,7 @@ // automatically retry. virtual void OnReply( const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) = 0; + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) = 0; // Called when the subscription process is far enough that it is possible to // send OBJECT_ACK messages; provides a callback to do so. The callback is // valid for as long as the session is valid. @@ -80,12 +80,12 @@ // immediately after calling this callback. // Alternatively, the application can call PublishNamespaceDone() to delete the // state. -using MoqtOutgoingPublishNamespaceCallback = quiche::MultiUseCallback<void( - const TrackNamespace& track_namespace, std::optional<MoqtErrorPair> error)>; +using MoqtOutgoingPublishNamespaceCallback = + quiche::MultiUseCallback<void(const TrackNamespace& track_namespace, + std::optional<MoqtRequestErrorInfo> error)>; using MoqtOutgoingSubscribeNamespaceCallback = quiche::SingleUseCallback<void( - TrackNamespace track_namespace, std::optional<RequestErrorCode> error, - absl::string_view reason)>; + TrackNamespace track_namespace, std::optional<MoqtRequestErrorInfo> info)>; class MoqtSessionInterface { public:
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 2dae407..fd8f33c 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -429,7 +429,7 @@ TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndCancel) { testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); @@ -440,10 +440,10 @@ publish_namespace_resolved_callback.AsStdFunction(), VersionSpecificParameters()); - MoqtRequestOk ok = {/*request_id=*/0, VersionSpecificParameters()}; + MoqtRequestOk ok = {/*request_id=*/0, MessageParameters()}; EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace("foo")); EXPECT_FALSE(error.has_value()); }); @@ -456,7 +456,7 @@ }; EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace("foo")); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); @@ -469,7 +469,7 @@ TEST_F(MoqtSessionTest, PublishNamespaceWithOkAndPublishNamespaceDone) { testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); @@ -480,10 +480,10 @@ publish_namespace_resolved_callback.AsStdFunction(), VersionSpecificParameters()); - MoqtRequestOk ok = {/*request_id=*/0, VersionSpecificParameters()}; + MoqtRequestOk ok = {/*request_id=*/0, MessageParameters()}; EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); EXPECT_FALSE(error.has_value()); }); @@ -499,7 +499,7 @@ TEST_F(MoqtSessionTest, PublishNamespaceWithError) { testing::MockFunction<void(TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error_message)> + std::optional<MoqtRequestErrorInfo> error_message)> publish_namespace_resolved_callback; std::unique_ptr<MoqtControlParserVisitor> stream_input = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); @@ -511,10 +511,10 @@ VersionSpecificParameters()); MoqtRequestError error{/*request_id=*/0, RequestErrorCode::kInternalError, - "Test error"}; + std::nullopt, "Test error"}; EXPECT_CALL(publish_namespace_resolved_callback, Call(_, _)) .WillOnce([&](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> error) { + std::optional<MoqtRequestErrorInfo> error) { EXPECT_EQ(track_namespace, TrackNamespace{"foo"}); ASSERT_TRUE(error.has_value()); EXPECT_EQ(error->error_code, RequestErrorCode::kInternalError); @@ -555,8 +555,8 @@ stream_input->OnSubscribeMessage(request); EXPECT_CALL(mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); - listener->OnSubscribeRejected( - MoqtErrorPair(RequestErrorCode::kInternalError, "Test error")); + listener->OnSubscribeRejected(MoqtRequestErrorInfo( + RequestErrorCode::kInternalError, std::nullopt, "Test error")); EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId), nullptr); } @@ -572,8 +572,8 @@ mock_stream_, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); EXPECT_CALL(*track, RemoveObjectListener); - listener->OnSubscribeRejected( - MoqtErrorPair(RequestErrorCode::kInternalError, "Test error")); + listener->OnSubscribeRejected(MoqtRequestErrorInfo( + RequestErrorCode::kInternalError, std::nullopt, "Test error")); }); stream_input->OnSubscribeMessage(request); EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, kDefaultPeerRequestId), @@ -768,11 +768,12 @@ TrackExtensions(), }; EXPECT_CALL(remote_track_visitor_, OnReply) - .WillOnce([&](const FullTrackName& ftn, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_EQ(ftn, FullTrackName("foo", "bar")); - EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); - }); + .WillOnce( + [&](const FullTrackName& ftn, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_EQ(ftn, FullTrackName("foo", "bar")); + EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); + }); stream_input->OnSubscribeOkMessage(ok); } @@ -793,11 +794,12 @@ TrackExtensions(), }; EXPECT_CALL(remote_track_visitor_, OnReply) - .WillOnce([&](const FullTrackName& ftn, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_EQ(ftn, FullTrackName("foo", "bar")); - EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); - }); + .WillOnce( + [&](const FullTrackName& ftn, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_EQ(ftn, FullTrackName("foo", "bar")); + EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); + }); stream_input->OnSubscribeOkMessage(ok); } @@ -932,16 +934,19 @@ MoqtRequestError error = { /*request_id=*/0, /*error_code=*/RequestErrorCode::kInvalidRange, + /*retry_interval=*/std::nullopt, /*reason_phrase=*/"deadbeef", }; EXPECT_CALL(remote_track_visitor_, OnReply) - .WillOnce([&](const FullTrackName& ftn, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_EQ(ftn, FullTrackName("foo", "bar")); - EXPECT_TRUE(std::holds_alternative<MoqtErrorPair>(response) && - std::get<MoqtErrorPair>(response).reason_phrase == - "deadbeef"); - }); + .WillOnce( + [&](const FullTrackName& ftn, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_EQ(ftn, FullTrackName("foo", "bar")); + EXPECT_TRUE( + std::holds_alternative<MoqtRequestErrorInfo>(response) && + std::get<MoqtRequestErrorInfo>(response).reason_phrase == + "deadbeef"); + }); stream_input->OnRequestErrorMessage(error); } @@ -978,7 +983,7 @@ }); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(MoqtRequestOk{ - kDefaultPeerRequestId, VersionSpecificParameters()}), + kDefaultPeerRequestId, MessageParameters()}), _)); stream_input->OnPublishNamespaceMessage(publish_namespace); MoqtPublishNamespaceDone unpublish_namespace = { @@ -1016,7 +1021,7 @@ }); EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(MoqtRequestOk{ - kDefaultPeerRequestId, VersionSpecificParameters()}), + kDefaultPeerRequestId, MessageParameters()}), _)); stream_input->OnPublishNamespaceMessage(publish_namespace); EXPECT_CALL(mock_stream_, @@ -1040,8 +1045,9 @@ track_namespace, *parameters, }; - MoqtErrorPair error = { + MoqtRequestErrorInfo error = { RequestErrorCode::kNotSupported, + /*retry_interval=*/std::nullopt, "deadbeef", }; EXPECT_CALL(session_callbacks_.incoming_publish_namespace_callback, @@ -1050,11 +1056,11 @@ [&](const TrackNamespace&, const std::optional<VersionSpecificParameters>&, MoqtResponseCallback callback) { std::move(callback)(error); }); - EXPECT_CALL( - mock_stream_, - Writev(SerializedControlMessage(MoqtRequestError{ - kDefaultPeerRequestId, error.error_code, error.reason_phrase}), - _)); + EXPECT_CALL(mock_stream_, + Writev(SerializedControlMessage(MoqtRequestError{ + kDefaultPeerRequestId, error.error_code, + error.retry_interval, error.reason_phrase}), + _)); stream_input->OnPublishNamespaceMessage(publish_namespace); } @@ -1068,15 +1074,13 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespace), _)); session_.SubscribeNamespace( track_namespace, - [&](const TrackNamespace& ns, std::optional<RequestErrorCode> error, - absl::string_view reason) { + [&](const TrackNamespace& ns, std::optional<MoqtRequestErrorInfo> error) { got_callback = true; EXPECT_EQ(track_namespace, ns); EXPECT_FALSE(error.has_value()); - EXPECT_EQ(reason, ""); }, - VersionSpecificParameters()); - MoqtRequestOk ok = {kDefaultLocalRequestId, VersionSpecificParameters()}; + MessageParameters()); + MoqtRequestOk ok = {kDefaultLocalRequestId, MessageParameters()}; stream_input->OnRequestOkMessage(ok); EXPECT_TRUE(got_callback); EXPECT_CALL( @@ -1096,17 +1100,17 @@ Writev(ControlMessageOfType(MoqtMessageType::kSubscribeNamespace), _)); session_.SubscribeNamespace( track_namespace, - [&](const TrackNamespace& ns, std::optional<RequestErrorCode> error, - absl::string_view reason) { + [&](const TrackNamespace& ns, std::optional<MoqtRequestErrorInfo> error) { got_callback = true; EXPECT_EQ(track_namespace, ns); ASSERT_TRUE(error.has_value()); - EXPECT_EQ(*error, RequestErrorCode::kInvalidRange); - EXPECT_EQ(reason, "deadbeef"); + EXPECT_EQ(error->error_code, RequestErrorCode::kInvalidRange); + EXPECT_EQ(error->reason_phrase, "deadbeef"); }, - VersionSpecificParameters()); + MessageParameters()); MoqtRequestError error = {kDefaultLocalRequestId, - RequestErrorCode::kInvalidRange, "deadbeef"}; + RequestErrorCode::kInvalidRange, std::nullopt, + "deadbeef"}; stream_input->OnRequestErrorMessage(error); EXPECT_TRUE(got_callback); // Entry is immediately gone. @@ -2588,7 +2592,8 @@ stream_input->OnFetchMessage(fetch); MoqtRequestError expected_error{fetch.request_id, - RequestErrorCode::kTrackDoesNotExist, "foo"}; + RequestErrorCode::kTrackDoesNotExist, + std::nullopt, "foo"}; EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_error), _)); fetch_task->CallFetchResponseCallback(expected_error); @@ -2718,6 +2723,7 @@ MoqtRequestError expected_error = { /*request_id=*/1, RequestErrorCode::kInvalidJoiningRequestId, + /*retry_interval=*/std::nullopt, "Joining Fetch for non-existent request", }; EXPECT_CALL(mock_stream_, @@ -2814,11 +2820,13 @@ TEST_F(MoqtSessionTest, IncomingSubscribeNamespace) { TrackNamespace track_namespace{"foo"}; - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + auto parameters = std::make_optional<MessageParameters>(); + parameters->authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtSubscribeNamespace subscribe_namespace = { /*request_id=*/1, track_namespace, + SubscribeNamespaceOption::kBoth, *parameters, }; webtransport::test::MockStream control_stream; @@ -2826,8 +2834,7 @@ MoqtSessionPeer::CreateControlStream(&session_, &control_stream); EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(track_namespace, parameters, _)) - .WillOnce([](const TrackNamespace&, - std::optional<VersionSpecificParameters>, + .WillOnce([](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -2835,22 +2842,23 @@ Writev(ControlMessageOfType(MoqtMessageType::kRequestOk), _)); stream_input->OnSubscribeNamespaceMessage(subscribe_namespace); MoqtUnsubscribeNamespace unsubscribe_namespace{track_namespace}; - EXPECT_CALL( - session_callbacks_.incoming_subscribe_namespace_callback, - Call(track_namespace, std::optional<VersionSpecificParameters>(), _)) + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, + Call(track_namespace, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, std::optional<VersionSpecificParameters>, + [](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_namespace); } TEST_F(MoqtSessionTest, IncomingSubscribeNamespaceWithError) { TrackNamespace track_namespace{"foo"}; - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + auto parameters = std::make_optional<MessageParameters>(); + parameters->authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtSubscribeNamespace subscribe_namespace = { /*request_id=*/1, track_namespace, + SubscribeNamespaceOption::kBoth, *parameters, }; webtransport::test::MockStream control_stream; @@ -2858,11 +2866,10 @@ MoqtSessionPeer::CreateControlStream(&session_, &control_stream); EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(track_namespace, parameters, _)) - .WillOnce([](const TrackNamespace&, - std::optional<VersionSpecificParameters>, + .WillOnce([](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { - std::move(callback)( - MoqtErrorPair{RequestErrorCode::kUnauthorized, "foo"}); + std::move(callback)(MoqtRequestErrorInfo{ + RequestErrorCode::kUnauthorized, std::nullopt, "foo"}); }); EXPECT_CALL(control_stream, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); @@ -2872,8 +2879,7 @@ subscribe_namespace.request_id += 2; EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(track_namespace, parameters, _)) - .WillOnce([](const TrackNamespace&, - std::optional<VersionSpecificParameters>, + .WillOnce([](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -2883,11 +2889,10 @@ // Cleanup. MoqtUnsubscribeNamespace unsubscribe_namespace{track_namespace}; - EXPECT_CALL( - session_callbacks_.incoming_subscribe_namespace_callback, - Call(track_namespace, std::optional<VersionSpecificParameters>(), _)) + EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, + Call(track_namespace, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, std::optional<VersionSpecificParameters>, + [](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_namespace); } @@ -2895,11 +2900,13 @@ TEST_F(MoqtSessionTest, IncomingSubscribeNamespaceWithPrefixOverlap) { TrackNamespace foo{"foo"}, foobar{"foo", "bar"}; - auto parameters = std::make_optional<VersionSpecificParameters>( - AuthTokenType::kOutOfBand, "foo"); + auto parameters = std::make_optional<MessageParameters>(); + parameters->authorization_tokens.emplace_back(AuthTokenType::kOutOfBand, + "foo"); MoqtSubscribeNamespace subscribe_namespace = { /*request_id=*/1, foo, + SubscribeNamespaceOption::kBoth, *parameters, }; webtransport::test::MockStream control_stream; @@ -2907,8 +2914,7 @@ MoqtSessionPeer::CreateControlStream(&session_, &control_stream); EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(foo, parameters, _)) - .WillOnce([](const TrackNamespace&, - std::optional<VersionSpecificParameters>, + .WillOnce([](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -2918,7 +2924,7 @@ // Overlapping request is rejected. subscribe_namespace.request_id += 2; - subscribe_namespace.track_namespace = foobar; + subscribe_namespace.track_namespace_prefix = foobar; EXPECT_CALL(control_stream, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); stream_input->OnSubscribeNamespaceMessage(subscribe_namespace); @@ -2926,9 +2932,9 @@ // Remove the subscription. Now a later one will work. MoqtUnsubscribeNamespace unsubscribe_namespace{foo}; EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, - Call(foo, std::optional<VersionSpecificParameters>(), _)) + Call(foo, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, std::optional<VersionSpecificParameters>, + [](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_namespace); @@ -2936,8 +2942,7 @@ subscribe_namespace.request_id += 2; EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, Call(foobar, parameters, _)) - .WillOnce([](const TrackNamespace&, - std::optional<VersionSpecificParameters>, + .WillOnce([](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { std::move(callback)(std::nullopt); }); @@ -2948,9 +2953,9 @@ // Cleanup. unsubscribe_namespace.track_namespace = foobar; EXPECT_CALL(session_callbacks_.incoming_subscribe_namespace_callback, - Call(foobar, std::optional<VersionSpecificParameters>(), _)) + Call(foobar, std::optional<MessageParameters>(), _)) .WillOnce( - [](const TrackNamespace&, std::optional<VersionSpecificParameters>, + [](const TrackNamespace&, std::optional<MessageParameters>, MoqtResponseCallback callback) { EXPECT_EQ(callback, nullptr); }); stream_input->OnUnsubscribeNamespaceMessage(unsubscribe_namespace); } @@ -2998,8 +3003,9 @@ VersionSpecificParameters()); MoqtRequestError error = { /*request_id=*/0, - /*error_code=*/RequestErrorCode::kUnauthorized, - /*reason_phrase=*/"No username provided", + RequestErrorCode::kUnauthorized, + /*retry_interval=*/std::nullopt, + "No username provided", }; stream_input->OnRequestErrorMessage(error); ASSERT_NE(fetch_task, nullptr); @@ -3509,19 +3515,16 @@ &remote_track_visitor_, parameters)); EXPECT_FALSE(session_.SubscribeNamespace( TrackNamespace{"foo"}, - +[](TrackNamespace /*track_namespace*/, - std::optional<RequestErrorCode> /*error*/, - absl::string_view /*reason*/) {}, - VersionSpecificParameters())); + +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, + MessageParameters())); session_.PublishNamespace( TrackNamespace{"foo"}, - +[](TrackNamespace /*track_namespace*/, - std::optional<MoqtErrorPair> /*error*/) {}, + +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName{TrackNamespace("foo"), "bar"}, - +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5, - std::nullopt, 127, std::nullopt, VersionSpecificParameters())); + +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, + 127, std::nullopt, VersionSpecificParameters())); // Error on additional GOAWAY. EXPECT_CALL(mock_session_, CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation), @@ -3571,19 +3574,16 @@ &remote_track_visitor_, parameters)); EXPECT_FALSE(session_.SubscribeNamespace( TrackNamespace{"foo"}, - +[](TrackNamespace /*track_namespace*/, - std::optional<RequestErrorCode> /*error*/, - absl::string_view /*reason*/) {}, - VersionSpecificParameters())); + +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, + MessageParameters())); session_.PublishNamespace( TrackNamespace{"foo"}, - +[](TrackNamespace /*track_namespace*/, - std::optional<MoqtErrorPair> /*error*/) {}, + +[](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, VersionSpecificParameters()); EXPECT_FALSE(session_.Fetch( FullTrackName(TrackNamespace("foo"), "bar"), - +[](std::unique_ptr<MoqtFetchTask> /*fetch_task*/) {}, Location(0, 0), 5, - std::nullopt, 127, std::nullopt, VersionSpecificParameters())); + +[](std::unique_ptr<MoqtFetchTask>) {}, Location(0, 0), 5, std::nullopt, + 127, std::nullopt, VersionSpecificParameters())); session_.GoAway(""); // GoAway timer fires. auto* goaway_alarm = static_cast<quic::test::MockAlarmFactory::TestAlarm*>( @@ -3914,16 +3914,12 @@ .WillRepeatedly( Return(quic::QuicTimeDelta::FromMilliseconds(10000))); EXPECT_CALL(*track, largest_location) - .WillRepeatedly(Return(std::nullopt)); + .WillRepeatedly(Return(Location(5, 30))); MoqtRequestOk expected_ok; expected_ok.request_id = track_status.request_id; - // TODO(martinduke): Add parameters. -#if 0 - expected_ok.track_alias = 0; - expected_ok.expires = quic::QuicTimeDelta::FromMilliseconds(10000); - expected_ok.group_order = MoqtDeliveryOrder::kAscending; - expected_ok.largest_location = std::nullopt; -#endif + expected_ok.parameters.expires = + quic::QuicTimeDelta::FromMilliseconds(10000); + expected_ok.parameters.largest_object = Location(5, 30); EXPECT_CALL(control_stream, Writev(SerializedControlMessage(expected_ok), _)); EXPECT_CALL(*track, RemoveObjectListener); @@ -3946,16 +3942,11 @@ ASSERT_NE(listener, nullptr); EXPECT_CALL(*track, expiration) .WillRepeatedly(Return(quic::QuicTimeDelta::FromMilliseconds(10000))); - EXPECT_CALL(*track, largest_location).WillRepeatedly(Return(std::nullopt)); + EXPECT_CALL(*track, largest_location).WillRepeatedly(Return(Location(5, 30))); MoqtRequestOk expected_ok; expected_ok.request_id = track_status.request_id; - // TODO(martinduke): Add parameters. -#if 0 - expected_ok.track_alias = 0; - expected_ok.expires = quic::QuicTimeDelta::FromMilliseconds(10000); - expected_ok.group_order = MoqtDeliveryOrder::kAscending; - expected_ok.largest_location = std::nullopt; -#endif + expected_ok.parameters.expires = quic::QuicTimeDelta::FromMilliseconds(10000); + expected_ok.parameters.largest_object = Location(5, 30); EXPECT_CALL(control_stream, Writev(SerializedControlMessage(expected_ok), _)); EXPECT_CALL(*track, RemoveObjectListener(listener)); listener->OnSubscribeAccepted(); @@ -3975,8 +3966,8 @@ control_stream, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); EXPECT_CALL(*track, RemoveObjectListener); - listener->OnSubscribeRejected( - MoqtErrorPair(RequestErrorCode::kInternalError, "Test error")); + listener->OnSubscribeRejected(MoqtRequestErrorInfo( + RequestErrorCode::kInternalError, std::nullopt, "Test error")); executed_AddObjectListener = true; }); stream_input->OnTrackStatusMessage(track_status); @@ -3998,8 +3989,8 @@ EXPECT_CALL(control_stream, Writev(ControlMessageOfType(MoqtMessageType::kRequestError), _)); EXPECT_CALL(*track, RemoveObjectListener(listener)); - listener->OnSubscribeRejected( - MoqtErrorPair(RequestErrorCode::kInternalError, "Test error")); + listener->OnSubscribeRejected(MoqtRequestErrorInfo( + RequestErrorCode::kInternalError, std::nullopt, "Test error")); } TEST_F(MoqtSessionTest, FinReportedToVisitor) { @@ -4016,11 +4007,12 @@ MoqtSubscribeOk ok = {/*request_id=*/0, /*track_alias=*/2, MessageParameters(), TrackExtensions()}; EXPECT_CALL(remote_track_visitor_, OnReply) - .WillOnce([&](const FullTrackName& ftn, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_EQ(ftn, FullTrackName("foo", "bar")); - EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); - }); + .WillOnce( + [&](const FullTrackName& ftn, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_EQ(ftn, FullTrackName("foo", "bar")); + EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); + }); control_stream->OnSubscribeOkMessage(ok); MoqtObject object = { /*track_alias=*/2, @@ -4059,11 +4051,12 @@ MoqtSubscribeOk ok = {/*request_id=*/0, /*track_alias=*/2, MessageParameters(), TrackExtensions()}; EXPECT_CALL(remote_track_visitor_, OnReply) - .WillOnce([&](const FullTrackName& ftn, - std::variant<SubscribeOkData, MoqtErrorPair> response) { - EXPECT_EQ(ftn, FullTrackName("foo", "bar")); - EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); - }); + .WillOnce( + [&](const FullTrackName& ftn, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { + EXPECT_EQ(ftn, FullTrackName("foo", "bar")); + EXPECT_TRUE(std::holds_alternative<SubscribeOkData>(response)); + }); control_stream->OnSubscribeOkMessage(ok); MoqtObject object = { /*track_alias=*/2, @@ -4169,8 +4162,33 @@ parameters); EXPECT_CALL(mock_session_, CloseSession); EXPECT_CALL(session_callbacks_.session_terminated_callback, Call); - stream_input->OnRequestOkMessage( - MoqtRequestOk{0, VersionSpecificParameters()}); + stream_input->OnRequestOkMessage(MoqtRequestOk{0, MessageParameters()}); +} + +TEST_F(MoqtSessionTest, ClientSetupNotAllowedOnControlStream) { + // While technically on the Control stream, when it arrives, it's an + // UnknownBidiStream + std::unique_ptr<MoqtControlParserVisitor> control_stream = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + EXPECT_CALL(mock_session_, CloseSession); + EXPECT_CALL(session_callbacks_.session_terminated_callback, Call); + control_stream->OnClientSetupMessage(MoqtClientSetup()); +} + +TEST_F(MoqtSessionTest, NamespaceNotAllowedOnControlStream) { + std::unique_ptr<MoqtControlParserVisitor> control_stream = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + EXPECT_CALL(mock_session_, CloseSession); + EXPECT_CALL(session_callbacks_.session_terminated_callback, Call); + control_stream->OnNamespaceMessage(MoqtNamespace()); +} + +TEST_F(MoqtSessionTest, NamespaceDoneNotAllowedOnControlStream) { + std::unique_ptr<MoqtControlParserVisitor> control_stream = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + EXPECT_CALL(mock_session_, CloseSession); + EXPECT_CALL(session_callbacks_.session_terminated_callback, Call); + control_stream->OnNamespaceDoneMessage(MoqtNamespaceDone()); } // TODO: re-enable this test once this behavior is re-implemented.
diff --git a/quiche/quic/moqt/relay_namespace_tree.h b/quiche/quic/moqt/relay_namespace_tree.h index e755bd0..7c0c027 100644 --- a/quiche/quic/moqt/relay_namespace_tree.h +++ b/quiche/quic/moqt/relay_namespace_tree.h
@@ -157,7 +157,7 @@ if (!node->publishers.empty()) { subscriber->PublishNamespace( track_namespace, - [](const TrackNamespace&, std::optional<MoqtErrorPair>) {}, + [](const TrackNamespace&, std::optional<MoqtRequestErrorInfo>) {}, // TODO(martinduke): Add parameters. VersionSpecificParameters()); } @@ -189,7 +189,7 @@ if (adding) { subscriber->PublishNamespace( track_namespace, - [](const TrackNamespace&, std::optional<MoqtErrorPair>) {}, + [](const TrackNamespace&, std::optional<MoqtRequestErrorInfo>) {}, // TODO(martinduke): Add parameters. VersionSpecificParameters()); } else {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index e35f5b0..50f8ae2 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -58,6 +58,12 @@ quiche::QuicheBuffer operator()(const MoqtPublishNamespaceDone& message) { return framer.SerializePublishNamespaceDone(message); } + quiche::QuicheBuffer operator()(const MoqtNamespace& message) { + return framer.SerializeNamespace(message); + } + quiche::QuicheBuffer operator()(const MoqtNamespaceDone& message) { + return framer.SerializeNamespaceDone(message); + } quiche::QuicheBuffer operator()(const MoqtPublishNamespaceCancel& message) { return framer.SerializePublishNamespaceCancel(message); } @@ -140,6 +146,12 @@ void OnPublishNamespaceDoneMessage(const MoqtPublishNamespaceDone& message) { frames_.push_back(message); } + void OnNamespaceMessage(const MoqtNamespace& message) { + frames_.push_back(message); + } + void OnNamespaceDoneMessage(const MoqtNamespaceDone& message) { + frames_.push_back(message); + } void OnPublishNamespaceCancelMessage( const MoqtPublishNamespaceCancel& message) { frames_.push_back(message);
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index 9bb0459..77bb4c7 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -25,14 +25,16 @@ // TODO: remove MoqtObject from TestMessageBase::MessageStructuredData and merge // those two types. -using MoqtGenericFrame = std::variant< - MoqtClientSetup, MoqtServerSetup, MoqtRequestOk, MoqtRequestError, - MoqtSubscribe, MoqtSubscribeOk, MoqtUnsubscribe, MoqtPublishDone, - MoqtSubscribeUpdate, MoqtPublishNamespace, MoqtPublishNamespaceDone, - MoqtPublishNamespaceCancel, MoqtTrackStatus, MoqtGoAway, - MoqtSubscribeNamespace, MoqtUnsubscribeNamespace, MoqtMaxRequestId, - MoqtFetch, MoqtFetchCancel, MoqtFetchOk, MoqtRequestsBlocked, MoqtPublish, - MoqtPublishOk, MoqtObjectAck>; +using MoqtGenericFrame = + std::variant<MoqtClientSetup, MoqtServerSetup, MoqtRequestOk, + MoqtRequestError, MoqtSubscribe, MoqtSubscribeOk, + MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, + MoqtPublishNamespace, MoqtPublishNamespaceDone, MoqtNamespace, + MoqtNamespaceDone, MoqtPublishNamespaceCancel, MoqtTrackStatus, + MoqtGoAway, MoqtSubscribeNamespace, MoqtUnsubscribeNamespace, + MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, + MoqtRequestsBlocked, MoqtPublish, MoqtPublishOk, + MoqtObjectAck>; std::string SerializeGenericMessage(const MoqtGenericFrame& frame, bool use_webtrans = false);
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h index 138db33..90807d3 100644 --- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -42,7 +42,7 @@ MoqtResponseCallback)> incoming_publish_namespace_callback; testing::MockFunction<void(const TrackNamespace&, - std::optional<VersionSpecificParameters>, + std::optional<MessageParameters>, MoqtResponseCallback)> incoming_subscribe_namespace_callback; @@ -182,7 +182,7 @@ public: MOCK_METHOD(void, OnReply, (const FullTrackName& full_track_name, - (std::variant<SubscribeOkData, MoqtErrorPair> response)), + (std::variant<SubscribeOkData, MoqtRequestErrorInfo> response)), (override)); MOCK_METHOD(void, OnCanAckObjects, (MoqtObjectAckFunction ack_function), (override)); @@ -269,7 +269,7 @@ class MockMoqtObjectListener : public MoqtObjectListener { public: MOCK_METHOD(void, OnSubscribeAccepted, (), (override)); - MOCK_METHOD(void, OnSubscribeRejected, (MoqtErrorPair), (override)); + MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo), (override)); MOCK_METHOD(void, OnNewObjectAvailable, (Location, uint64_t, MoqtPriority, MoqtForwardingPreference), (override));
diff --git a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h index c55ff17..4c0072b 100644 --- a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h
@@ -77,6 +77,12 @@ const MoqtPublishNamespaceDone& message) override { OnControlMessage(message); } + void OnNamespaceMessage(const MoqtNamespace& message) override { + OnControlMessage(message); + } + void OnNamespaceDoneMessage(const MoqtNamespaceDone& message) override { + OnControlMessage(message); + } void OnPublishNamespaceCancelMessage( const MoqtPublishNamespaceCancel& message) override { OnControlMessage(message);
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.cc b/quiche/quic/moqt/test_tools/moqt_simulator.cc index 665a6a9..4df183f 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.cc +++ b/quiche/quic/moqt/test_tools/moqt_simulator.cc
@@ -195,10 +195,10 @@ void ObjectReceiver::OnReply( const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) { + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { QUICHE_CHECK(full_track_name == TrackName()); - if (std::holds_alternative<MoqtErrorPair>(response)) { - MoqtErrorPair error = std::get<MoqtErrorPair>(response); + if (std::holds_alternative<MoqtRequestErrorInfo>(response)) { + MoqtRequestErrorInfo error = std::get<MoqtRequestErrorInfo>(response); QUICHE_CHECK(!error.reason_phrase.empty()) << error.reason_phrase; } }
diff --git a/quiche/quic/moqt/test_tools/moqt_simulator.h b/quiche/quic/moqt/test_tools/moqt_simulator.h index d8e8b91..8a2e314 100644 --- a/quiche/quic/moqt/test_tools/moqt_simulator.h +++ b/quiche/quic/moqt/test_tools/moqt_simulator.h
@@ -155,8 +155,9 @@ quic::QuicTimeDelta deadline) : clock_(clock), deadline_(deadline) {} - void OnReply(const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) override; + void OnReply( + const FullTrackName& full_track_name, + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) override; void OnCanAckObjects(MoqtObjectAckFunction ack_function) override { object_ack_function_ = std::move(ack_function);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index f6f8c8a..6d4eb73 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -90,14 +90,16 @@ public: virtual ~TestMessageBase() = default; - using MessageStructuredData = std::variant< - MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtRequestOk, - MoqtRequestError, MoqtSubscribe, MoqtSubscribeOk, MoqtUnsubscribe, - MoqtPublishDone, MoqtSubscribeUpdate, MoqtPublishNamespace, - MoqtPublishNamespaceDone, MoqtPublishNamespaceCancel, MoqtTrackStatus, - MoqtGoAway, MoqtSubscribeNamespace, MoqtUnsubscribeNamespace, - MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, - MoqtRequestsBlocked, MoqtPublish, MoqtPublishOk, MoqtObjectAck>; + using MessageStructuredData = + std::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtRequestOk, + MoqtRequestError, MoqtSubscribe, MoqtSubscribeOk, + MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, + MoqtPublishNamespace, MoqtPublishNamespaceDone, + MoqtPublishNamespaceCancel, MoqtTrackStatus, MoqtGoAway, + MoqtSubscribeNamespace, MoqtUnsubscribeNamespace, + MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, + MoqtRequestsBlocked, MoqtPublish, MoqtPublishOk, + MoqtNamespace, MoqtNamespaceDone, MoqtObjectAck>; // The total actual size of the message. size_t total_message_size() const { return wire_image_size_; } @@ -755,6 +757,10 @@ QUIC_LOG(INFO) << "REQUEST_ERROR error code mismatch"; return false; } + if (cast.retry_interval != request_error_.retry_interval) { + QUIC_LOG(INFO) << "REQUEST_ERROR retry interval mismatch"; + return false; + } if (cast.reason_phrase != request_error_.reason_phrase) { QUIC_LOG(INFO) << "REQUEST_ERROR reason phrase mismatch"; return false; @@ -772,14 +778,16 @@ MoqtRequestError request_error_ = { /*request_id=*/2, /*error_code=*/RequestErrorCode::kInvalidRange, + /*retry_interval=*/quic::QuicTimeDelta::FromSeconds(10), /*reason_phrase=*/"bar", }; private: - uint8_t raw_packet_[9] = { - 0x05, 0x00, 0x06, + uint8_t raw_packet_[11] = { + 0x05, 0x00, 0x08, 0x02, // request_id = 2 0x05, // error_code = 5 + 0x67, 0x11, // retry_interval = 10000 ms 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" }; }; @@ -968,9 +976,74 @@ }; }; +class QUICHE_NO_EXPORT NamespaceMessage : public TestMessageBase { + public: + NamespaceMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtNamespace>(values); + if (cast.track_namespace_suffix != namespace_.track_namespace_suffix) { + QUIC_LOG(INFO) << "NAMESPACE suffix mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(namespace_); + } + + private: + uint8_t raw_packet_[8] = { + 0x08, 0x00, 0x05, 0x01, + 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + }; + + MoqtNamespace namespace_ = { + TrackNamespace{"foo"}, + }; +}; + +class QUICHE_NO_EXPORT NamespaceDoneMessage : public TestMessageBase { + public: + NamespaceDoneMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtNamespaceDone>(values); + if (cast.track_namespace_suffix != namespace_done_.track_namespace_suffix) { + QUIC_LOG(INFO) << "NAMESPACE_DONE suffix mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(namespace_done_); + } + + private: + uint8_t raw_packet_[8] = { + 0x0e, 0x00, 0x05, 0x01, + 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + }; + + MoqtNamespaceDone namespace_done_ = { + TrackNamespace{"foo"}, + }; +}; + class QUICHE_NO_EXPORT RequestOkMessage : public TestMessageBase { public: RequestOkMessage() : TestMessageBase() { + request_ok_.parameters.largest_object = Location(5, 1); SetWireImage(raw_packet_, sizeof(raw_packet_)); } @@ -987,24 +1060,22 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvv--v--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(request_ok_); } private: - uint8_t raw_packet_[11] = { - 0x07, 0x00, 0x08, 0x01, // request_id = 1 - 0x02, // 2 parameters - 0x02, 0x67, 0x10, // delivery_timeout = 10000 ms - 0x02, 0x67, 0x10, // max_cache_duration = 10000 ms + uint8_t raw_packet_[9] = { + 0x07, 0x00, 0x06, 0x01, // request_id = 1 + 0x01, // 1 parameter + 0x09, 0x02, 0x05, 0x01, // Largest Object = (5, 1) }; MoqtRequestOk request_ok_ = { /*request_id=*/1, - VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::FromMilliseconds(10000)), + MessageParameters(), // Set in the constructor. }; }; @@ -1136,6 +1207,9 @@ class QUICHE_NO_EXPORT SubscribeNamespaceMessage : public TestMessageBase { public: SubscribeNamespaceMessage() : TestMessageBase() { + subscribe_namespace_.parameters.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "bar")); + subscribe_namespace_.parameters.set_forward(true); SetWireImage(raw_packet_, sizeof(raw_packet_)); } @@ -1145,10 +1219,15 @@ QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE request_id mismatch"; return false; } - if (cast.track_namespace != subscribe_namespace_.track_namespace) { + if (cast.track_namespace_prefix != + subscribe_namespace_.track_namespace_prefix) { QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE track namespace mismatch"; return false; } + if (cast.subscribe_options != subscribe_namespace_.subscribe_options) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE subscribe options mismatch"; + return false; + } if (cast.parameters != subscribe_namespace_.parameters) { QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE parameters mismatch"; return false; @@ -1163,17 +1242,20 @@ } private: - uint8_t raw_packet_[17] = { - 0x11, 0x00, 0x0e, 0x01, // request_id = 1 + uint8_t raw_packet_[20] = { + 0x11, 0x00, 0x11, 0x01, // request_id = 1 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo" - 0x01, // 1 parameter + 0x02, // subscribe_options = kBoth + 0x02, // 2 parameters 0x03, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_tag = "bar" + 0x0d, 0x01, // forward = true }; MoqtSubscribeNamespace subscribe_namespace_ = { /*request_id=*/1, TrackNamespace("foo"), - VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"), + SubscribeNamespaceOption::kBoth, + MessageParameters(), // set in constructor. }; }; @@ -1786,6 +1868,10 @@ return std::make_unique<PublishNamespaceMessage>(); case MoqtMessageType::kPublishNamespaceDone: return std::make_unique<PublishNamespaceDoneMessage>(); + case MoqtMessageType::kNamespace: + return std::make_unique<NamespaceMessage>(); + case MoqtMessageType::kNamespaceDone: + return std::make_unique<NamespaceDoneMessage>(); case MoqtMessageType::kPublishNamespaceCancel: return std::make_unique<PublishNamespaceCancelMessage>(); case MoqtMessageType::kTrackStatus:
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc index a8148fd..d5900b7 100644 --- a/quiche/quic/moqt/tools/chat_client.cc +++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -22,6 +22,7 @@ #include "quiche/quic/core/io/quic_event_loop.h" #include "quiche/quic/core/quic_default_clock.h" #include "quiche/quic/core/quic_server_id.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_key_value_pair.h" #include "quiche/quic/moqt/moqt_known_track_publisher.h" #include "quiche/quic/moqt/moqt_names.h" @@ -69,8 +70,9 @@ std::cout << "PUBLISH_NAMESPACE for " << track_namespace.ToString() << "\n"; if (!track_name.has_value()) { std::cout << "PUBLISH_NAMESPACE rejected, invalid namespace\n"; - std::move(callback)(std::make_optional<MoqtErrorPair>( - RequestErrorCode::kTrackDoesNotExist, "Not a subscribed namespace")); + std::move(callback)(std::make_optional<MoqtRequestErrorInfo>( + RequestErrorCode::kTrackDoesNotExist, std::nullopt, + "Not a subscribed namespace")); return; } if (other_users_.contains(*track_name)) { @@ -187,7 +189,7 @@ void ChatClient::RemoteTrackVisitor::OnReply( const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) { + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) { auto it = client_->other_users_.find(full_track_name); if (it == client_->other_users_.end()) { std::cout << "Error: received reply for unknown user " @@ -199,9 +201,9 @@ if (std::holds_alternative<SubscribeOkData>(response)) { std::cout << "ACCEPTED\n"; } else { - auto request_error = std::get<MoqtErrorPair>(response); + auto request_error = std::get<MoqtRequestErrorInfo>(response); std::cout << "REJECTED, reason = " - << std::get<MoqtErrorPair>(response).reason_phrase << "\n"; + << std::get<MoqtRequestErrorInfo>(response).reason_phrase << "\n"; client_->other_users_.erase(it); } } @@ -249,7 +251,7 @@ session_->set_publisher(&publisher_); MoqtOutgoingPublishNamespaceCallback publish_namespace_callback = [this](TrackNamespace track_namespace, - std::optional<MoqtErrorPair> reason) { + std::optional<MoqtRequestErrorInfo> reason) { if (reason.has_value()) { std::cout << "PUBLISH_NAMESPACE rejected, " << reason->reason_phrase << "\n"; @@ -272,11 +274,11 @@ bool subscribe_response_received = false; MoqtOutgoingSubscribeNamespaceCallback subscribe_namespace_callback = [&, this](TrackNamespace track_namespace, - std::optional<RequestErrorCode> error, - absl::string_view reason) { + std::optional<MoqtRequestErrorInfo> error) { subscribe_response_received = true; if (error.has_value()) { - std::cout << "SUBSCRIBE_NAMESPACE rejected, " << reason << "\n"; + std::cout << "SUBSCRIBE_NAMESPACE rejected, " << error->reason_phrase + << "\n"; session_->Error(MoqtError::kInternalError, "Local SUBSCRIBE_NAMESPACE rejected"); return; @@ -285,7 +287,8 @@ << " accepted\n"; return; }; - VersionSpecificParameters parameters( + MessageParameters parameters; + parameters.authorization_tokens.emplace_back( AuthTokenType::kOutOfBand, std::string(GetUsername(my_track_name_))); session_->SubscribeNamespace(GetChatNamespace(my_track_name_), std::move(subscribe_namespace_callback),
diff --git a/quiche/quic/moqt/tools/chat_client.h b/quiche/quic/moqt/tools/chat_client.h index a26a7a6..2c9ffd8 100644 --- a/quiche/quic/moqt/tools/chat_client.h +++ b/quiche/quic/moqt/tools/chat_client.h
@@ -100,7 +100,7 @@ void OnReply( const moqt::FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) override; + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) override; void OnCanAckObjects(MoqtObjectAckFunction) override {}
diff --git a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc index 2a61655..c3de814 100644 --- a/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc +++ b/quiche/quic/moqt/tools/moqt_ingestion_server_bin.cc
@@ -29,6 +29,7 @@ #include "absl/strings/string_view.h" #include "absl/time/clock.h" #include "absl/time/time.h" +#include "quiche/quic/moqt/moqt_error.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_object.h" #include "quiche/quic/moqt/moqt_session.h" @@ -129,9 +130,9 @@ << "Rejected remote publish_namespace as it contained " "disallowed characters; namespace: " << track_namespace; - std::move(callback)( - MoqtErrorPair{RequestErrorCode::kInternalError, - "Track namespace contains disallowed characters"}); + std::move(callback)(MoqtRequestErrorInfo{ + RequestErrorCode::kInternalError, std::nullopt, + "Track namespace contains disallowed characters"}); return; } @@ -151,8 +152,9 @@ subscribed_namespaces_.erase(it); QUICHE_LOG(ERROR) << "Failed to create directory " << directory_path << "; " << status; - std::move(callback)(MoqtErrorPair{RequestErrorCode::kInternalError, - "Failed to create output directory"}); + std::move(callback)( + MoqtRequestErrorInfo{RequestErrorCode::kInternalError, std::nullopt, + "Failed to create output directory"}); return; } @@ -175,11 +177,11 @@ void OnReply( const FullTrackName& full_track_name, - std::variant<SubscribeOkData, MoqtErrorPair> response) override { - if (std::holds_alternative<MoqtErrorPair>(response)) { - QUICHE_LOG(ERROR) << "Failed to subscribe to the peer track " - << full_track_name << ": " - << std::get<MoqtErrorPair>(response).reason_phrase; + std::variant<SubscribeOkData, MoqtRequestErrorInfo> response) override { + if (std::holds_alternative<MoqtRequestErrorInfo>(response)) { + QUICHE_LOG(ERROR) + << "Failed to subscribe to the peer track " << full_track_name + << ": " << std::get<MoqtRequestErrorInfo>(response).reason_phrase; } }
diff --git a/quiche/quic/moqt/tools/moqt_relay.cc b/quiche/quic/moqt/tools/moqt_relay.cc index 3cda74c..45e84d0 100644 --- a/quiche/quic/moqt/tools/moqt_relay.cc +++ b/quiche/quic/moqt/tools/moqt_relay.cc
@@ -16,7 +16,8 @@ #include "quiche/quic/core/crypto/proof_verifier.h" #include "quiche/quic/core/io/quic_event_loop.h" #include "quiche/quic/core/quic_server_id.h" -#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" +#include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/moqt_session_callbacks.h" #include "quiche/quic/moqt/moqt_session_interface.h" @@ -122,10 +123,9 @@ } }; session->callbacks().incoming_subscribe_namespace_callback = - [this, session]( - const TrackNamespace& track_namespace, - const std::optional<VersionSpecificParameters>& parameters, - MoqtResponseCallback callback) { + [this, session](const TrackNamespace& track_namespace, + const std::optional<MessageParameters>& parameters, + MoqtResponseCallback callback) { if (is_closing_) { return; }
diff --git a/quiche/quic/moqt/tools/moqt_relay_test.cc b/quiche/quic/moqt/tools/moqt_relay_test.cc index e50b1f4..66df0d8 100644 --- a/quiche/quic/moqt/tools/moqt_relay_test.cc +++ b/quiche/quic/moqt/tools/moqt_relay_test.cc
@@ -15,7 +15,9 @@ #include "absl/strings/string_view.h" #include "quiche/quic/core/io/quic_event_loop.h" #include "quiche/quic/core/quic_time.h" -#include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_error.h" +#include "quiche/quic/moqt/moqt_key_value_pair.h" +#include "quiche/quic/moqt/moqt_names.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_relay_publisher.h" #include "quiche/quic/moqt/moqt_session.h" @@ -131,7 +133,7 @@ // relay_ publishes a namespace, so upstream_ will route to relay_. relay_.client_session()->PublishNamespace( TrackNamespace({"foo"}), - [](TrackNamespace, std::optional<MoqtErrorPair>) {}, + [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, VersionSpecificParameters()); upstream_.RunOneEvent(); // There is now an upstream session for "Foo". @@ -187,7 +189,7 @@ // Downstream publishes a namespace. It's stored in relay_ but upstream_ // hasn't been notified. downstream_.client_session()->PublishNamespace( - foobar, [](TrackNamespace, std::optional<MoqtErrorPair>) {}, + foobar, [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, VersionSpecificParameters()); relay_.RunOneEvent(); upstream_.RunOneEvent(); @@ -196,16 +198,15 @@ // Upstream subscribes. Now it's notified and forwards it to the probe. upstream_session->SubscribeNamespace( - foo, - [](TrackNamespace, std::optional<RequestErrorCode>, absl::string_view) {}, - VersionSpecificParameters()); + foo, [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, + MessageParameters()); upstream_.RunOneEvent(); upstream_.RunOneEvent(); EXPECT_THAT(upstream_published_namespaces, ElementsAre(foobar)); // Downstream publishes another namespace. Everyone is notified. downstream_.client_session()->PublishNamespace( - foobaz, [](TrackNamespace, std::optional<MoqtErrorPair>) {}, + foobaz, [](TrackNamespace, std::optional<MoqtRequestErrorInfo>) {}, VersionSpecificParameters()); relay_.RunOneEvent(); upstream_.RunOneEvent();