Add parser/framer support for SUBSCRIBE_NAMESPACE, SUBSCRIBE_NAMESPACE_OK, SUBSCRIBE_NAMESPACE_ERROR, UNSUBSCRIBE_NAMESPACE. There is no processing of the semantics; these messages are dropped on the floor. PiperOrigin-RevId: 681111465
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index f17caf8..074c2be 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -581,6 +581,33 @@ WireStringWithVarInt62Length(message.new_session_uri)); } +quiche::QuicheBuffer MoqtFramer::SerializeSubscribeNamespace( + const MoqtSubscribeNamespace& message) { + return Serialize(WireVarInt62(MoqtMessageType::kSubscribeNamespace), + WireFullTrackName(message.track_namespace, false), + WireSubscribeParameterList(message.parameters)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeSubscribeNamespaceOk( + const MoqtSubscribeNamespaceOk& message) { + return Serialize(WireVarInt62(MoqtMessageType::kSubscribeNamespaceOk), + WireFullTrackName(message.track_namespace, false)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeSubscribeNamespaceError( + const MoqtSubscribeNamespaceError& message) { + return Serialize(WireVarInt62(MoqtMessageType::kSubscribeNamespaceError), + WireFullTrackName(message.track_namespace, false), + WireVarInt62(message.error_code), + WireStringWithVarInt62Length(message.reason_phrase)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeUnsubscribeNamespace( + const MoqtUnsubscribeNamespace& message) { + return Serialize(WireVarInt62(MoqtMessageType::kUnsubscribeNamespace), + WireFullTrackName(message.track_namespace, false)); +} + quiche::QuicheBuffer MoqtFramer::SerializeMaxSubscribeId( const MoqtMaxSubscribeId& message) { return Serialize(WireVarInt62(MoqtMessageType::kMaxSubscribeId),
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index b18a77c..78b9cf3 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -55,6 +55,14 @@ quiche::QuicheBuffer SerializeUnannounce(const MoqtUnannounce& message); quiche::QuicheBuffer SerializeTrackStatus(const MoqtTrackStatus& message); quiche::QuicheBuffer SerializeGoAway(const MoqtGoAway& message); + quiche::QuicheBuffer SerializeSubscribeNamespace( + const MoqtSubscribeNamespace& message); + quiche::QuicheBuffer SerializeSubscribeNamespaceOk( + const MoqtSubscribeNamespaceOk& message); + quiche::QuicheBuffer SerializeSubscribeNamespaceError( + const MoqtSubscribeNamespaceError& message); + quiche::QuicheBuffer SerializeUnsubscribeNamespace( + const MoqtUnsubscribeNamespace& message); quiche::QuicheBuffer SerializeMaxSubscribeId( const MoqtMaxSubscribeId& message); quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index be6ae32..5ed05c1 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -46,6 +46,10 @@ MoqtMessageType::kAnnounceError, MoqtMessageType::kUnannounce, MoqtMessageType::kGoAway, + MoqtMessageType::kSubscribeNamespace, + MoqtMessageType::kSubscribeNamespaceOk, + MoqtMessageType::kSubscribeNamespaceError, + MoqtMessageType::kUnsubscribeNamespace, MoqtMessageType::kMaxSubscribeId, MoqtMessageType::kObjectAck, MoqtMessageType::kClientSetup, @@ -158,6 +162,22 @@ auto data = std::get<MoqtGoAway>(structured_data); return framer_.SerializeGoAway(data); } + case moqt::MoqtMessageType::kSubscribeNamespace: { + auto data = std::get<MoqtSubscribeNamespace>(structured_data); + return framer_.SerializeSubscribeNamespace(data); + } + case moqt::MoqtMessageType::kSubscribeNamespaceOk: { + auto data = std::get<MoqtSubscribeNamespaceOk>(structured_data); + return framer_.SerializeSubscribeNamespaceOk(data); + } + case moqt::MoqtMessageType::kSubscribeNamespaceError: { + auto data = std::get<MoqtSubscribeNamespaceError>(structured_data); + return framer_.SerializeSubscribeNamespaceError(data); + } + case moqt::MoqtMessageType::kUnsubscribeNamespace: { + auto data = std::get<MoqtUnsubscribeNamespace>(structured_data); + return framer_.SerializeUnsubscribeNamespace(data); + } case moqt::MoqtMessageType::kMaxSubscribeId: { auto data = std::get<MoqtMaxSubscribeId>(structured_data); return framer_.SerializeMaxSubscribeId(data);
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 57c26ab..b5816d8 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -489,6 +489,25 @@ std::string new_session_uri; }; +struct QUICHE_EXPORT MoqtSubscribeNamespace { + FullTrackName track_namespace; + MoqtSubscribeParameters parameters; +}; + +struct QUICHE_EXPORT MoqtSubscribeNamespaceOk { + FullTrackName track_namespace; +}; + +struct QUICHE_EXPORT MoqtSubscribeNamespaceError { + FullTrackName track_namespace; + MoqtAnnounceErrorCode error_code; + std::string reason_phrase; +}; + +struct QUICHE_EXPORT MoqtUnsubscribeNamespace { + FullTrackName track_namespace; +}; + struct QUICHE_EXPORT MoqtMaxSubscribeId { uint64_t max_subscribe_id; };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 51658d4..2140748 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -8,7 +8,6 @@ #include <cstddef> #include <cstdint> #include <cstring> -#include <initializer_list> #include <optional> #include <string> @@ -229,6 +228,14 @@ return ProcessTrackStatus(reader); case MoqtMessageType::kGoAway: return ProcessGoAway(reader); + case MoqtMessageType::kSubscribeNamespace: + return ProcessSubscribeNamespace(reader); + case MoqtMessageType::kSubscribeNamespaceOk: + return ProcessSubscribeNamespaceOk(reader); + case MoqtMessageType::kSubscribeNamespaceError: + return ProcessSubscribeNamespaceError(reader); + case MoqtMessageType::kUnsubscribeNamespace: + return ProcessUnsubscribeNamespace(reader); case MoqtMessageType::kMaxSubscribeId: return ProcessMaxSubscribeId(reader); case moqt::MoqtMessageType::kObjectAck: @@ -706,6 +713,54 @@ return reader.PreviouslyReadPayload().length(); } +size_t MoqtControlParser::ProcessSubscribeNamespace( + quic::QuicDataReader& reader) { + MoqtSubscribeNamespace subscribe_namespace; + if (!ReadTrackNamespace(reader, subscribe_namespace.track_namespace)) { + return 0; + } + if (!ReadSubscribeParameters(reader, subscribe_namespace.parameters)) { + return 0; + } + visitor_.OnSubscribeNamespaceMessage(subscribe_namespace); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessSubscribeNamespaceOk( + quic::QuicDataReader& reader) { + MoqtSubscribeNamespaceOk subscribe_namespace_ok; + if (!ReadTrackNamespace(reader, subscribe_namespace_ok.track_namespace)) { + return 0; + } + visitor_.OnSubscribeNamespaceOkMessage(subscribe_namespace_ok); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessSubscribeNamespaceError( + quic::QuicDataReader& reader) { + MoqtSubscribeNamespaceError subscribe_namespace_error; + uint64_t error_code; + if (!ReadTrackNamespace(reader, subscribe_namespace_error.track_namespace) || + !reader.ReadVarInt62(&error_code) || + !reader.ReadStringVarInt62(subscribe_namespace_error.reason_phrase)) { + return 0; + } + subscribe_namespace_error.error_code = + static_cast<MoqtAnnounceErrorCode>(error_code); + visitor_.OnSubscribeNamespaceErrorMessage(subscribe_namespace_error); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessUnsubscribeNamespace( + quic::QuicDataReader& reader) { + MoqtUnsubscribeNamespace unsubscribe_namespace; + if (!ReadTrackNamespace(reader, unsubscribe_namespace.track_namespace)) { + return 0; + } + visitor_.OnUnsubscribeNamespaceMessage(unsubscribe_namespace); + return reader.PreviouslyReadPayload().length(); +} + size_t MoqtControlParser::ProcessMaxSubscribeId(quic::QuicDataReader& reader) { MoqtMaxSubscribeId max_subscribe_id; if (!reader.ReadVarInt62(&max_subscribe_id.max_subscribe_id)) {
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 876b27a..5f180b5 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -43,6 +43,14 @@ virtual void OnUnannounceMessage(const MoqtUnannounce& message) = 0; virtual void OnTrackStatusMessage(const MoqtTrackStatus& message) = 0; virtual void OnGoAwayMessage(const MoqtGoAway& message) = 0; + virtual void OnSubscribeNamespaceMessage( + const MoqtSubscribeNamespace& message) = 0; + virtual void OnSubscribeNamespaceOkMessage( + const MoqtSubscribeNamespaceOk& message) = 0; + virtual void OnSubscribeNamespaceErrorMessage( + const MoqtSubscribeNamespaceError& message) = 0; + virtual void OnUnsubscribeNamespaceMessage( + const MoqtUnsubscribeNamespace& message) = 0; virtual void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) = 0; virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0; @@ -108,6 +116,10 @@ size_t ProcessUnannounce(quic::QuicDataReader& reader); size_t ProcessTrackStatus(quic::QuicDataReader& reader); size_t ProcessGoAway(quic::QuicDataReader& reader); + size_t ProcessSubscribeNamespace(quic::QuicDataReader& reader); + size_t ProcessSubscribeNamespaceOk(quic::QuicDataReader& reader); + size_t ProcessSubscribeNamespaceError(quic::QuicDataReader& reader); + size_t ProcessUnsubscribeNamespace(quic::QuicDataReader& reader); size_t ProcessMaxSubscribeId(quic::QuicDataReader& reader); size_t ProcessObjectAck(quic::QuicDataReader& reader);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 6c34083..5c1d028 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -31,15 +31,28 @@ using ::testing::Optional; constexpr std::array kMessageTypes{ - MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, - MoqtMessageType::kSubscribeError, MoqtMessageType::kSubscribeUpdate, - MoqtMessageType::kUnsubscribe, MoqtMessageType::kSubscribeDone, - MoqtMessageType::kAnnounceCancel, MoqtMessageType::kTrackStatusRequest, - MoqtMessageType::kTrackStatus, MoqtMessageType::kAnnounce, - MoqtMessageType::kAnnounceOk, MoqtMessageType::kAnnounceError, - MoqtMessageType::kUnannounce, MoqtMessageType::kClientSetup, - MoqtMessageType::kServerSetup, MoqtMessageType::kGoAway, - MoqtMessageType::kMaxSubscribeId, MoqtMessageType::kObjectAck, + MoqtMessageType::kSubscribe, + MoqtMessageType::kSubscribeOk, + MoqtMessageType::kSubscribeError, + MoqtMessageType::kSubscribeUpdate, + MoqtMessageType::kUnsubscribe, + MoqtMessageType::kSubscribeDone, + MoqtMessageType::kAnnounceCancel, + MoqtMessageType::kTrackStatusRequest, + MoqtMessageType::kTrackStatus, + MoqtMessageType::kAnnounce, + MoqtMessageType::kAnnounceOk, + MoqtMessageType::kAnnounceError, + MoqtMessageType::kUnannounce, + MoqtMessageType::kClientSetup, + MoqtMessageType::kServerSetup, + MoqtMessageType::kGoAway, + MoqtMessageType::kSubscribeNamespace, + MoqtMessageType::kSubscribeNamespaceOk, + MoqtMessageType::kSubscribeNamespaceError, + MoqtMessageType::kUnsubscribeNamespace, + MoqtMessageType::kMaxSubscribeId, + MoqtMessageType::kObjectAck, }; constexpr std::array kDataStreamTypes{ MoqtDataStreamType::kStreamHeaderTrack, @@ -162,6 +175,22 @@ void OnGoAwayMessage(const MoqtGoAway& message) override { OnControlMessage(message); } + void OnSubscribeNamespaceMessage( + const MoqtSubscribeNamespace& message) override { + OnControlMessage(message); + } + void OnSubscribeNamespaceOkMessage( + const MoqtSubscribeNamespaceOk& message) override { + OnControlMessage(message); + } + void OnSubscribeNamespaceErrorMessage( + const MoqtSubscribeNamespaceError& message) override { + OnControlMessage(message); + } + void OnUnsubscribeNamespaceMessage( + const MoqtUnsubscribeNamespace& message) override { + OnControlMessage(message); + } void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override { OnControlMessage(message); }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index ecfe8d0..dbf0186 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -204,6 +204,14 @@ void OnUnannounceMessage(const MoqtUnannounce& /*message*/) override {} void OnTrackStatusMessage(const MoqtTrackStatus& message) override {} void OnGoAwayMessage(const MoqtGoAway& /*message*/) override {} + void OnSubscribeNamespaceMessage( + const MoqtSubscribeNamespace& message) override {} + void OnSubscribeNamespaceOkMessage( + const MoqtSubscribeNamespaceOk& message) override {} + void OnSubscribeNamespaceErrorMessage( + const MoqtSubscribeNamespaceError& message) override {} + void OnUnsubscribeNamespaceMessage( + const MoqtUnsubscribeNamespace& message) override {} void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override; void OnObjectAckMessage(const MoqtObjectAck& message) override { auto subscription_it =
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index b274194..c694e8d 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -32,13 +32,14 @@ public: virtual ~TestMessageBase() = default; - using MessageStructuredData = - absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe, - MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, - MoqtSubscribeDone, MoqtSubscribeUpdate, MoqtAnnounce, - MoqtAnnounceOk, MoqtAnnounceError, MoqtAnnounceCancel, - MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus, - MoqtGoAway, MoqtMaxSubscribeId, MoqtObjectAck>; + using MessageStructuredData = absl::variant< + MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtSubscribe, + MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone, + MoqtSubscribeUpdate, MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, + MoqtAnnounceCancel, MoqtTrackStatusRequest, MoqtUnannounce, + MoqtTrackStatus, MoqtGoAway, MoqtSubscribeNamespace, + MoqtSubscribeNamespaceOk, MoqtSubscribeNamespaceError, + MoqtUnsubscribeNamespace, MoqtMaxSubscribeId, MoqtObjectAck>; // The total actual size of the message. size_t total_message_size() const { return wire_image_size_; } @@ -1061,6 +1062,150 @@ }; }; +class QUICHE_NO_EXPORT SubscribeNamespaceMessage : public TestMessageBase { + public: + SubscribeNamespaceMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtSubscribeNamespace>(values); + if (cast.track_namespace != subscribe_namespace_.track_namespace) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE track namespace mismatch"; + return false; + } + if (cast.parameters != subscribe_namespace_.parameters) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE parameters mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv---vvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(subscribe_namespace_); + } + + private: + uint8_t raw_packet_[12] = { + 0x11, 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo" + 0x01, // 1 parameter + 0x02, 0x03, 0x62, 0x61, 0x72, // authorization_info = "bar" + }; + + MoqtSubscribeNamespace subscribe_namespace_ = { + /*track_namespace=*/FullTrackName{"foo"}, + /*parameters=*/ + MoqtSubscribeParameters{"bar", std::nullopt, std::nullopt, std::nullopt}, + }; +}; + +class QUICHE_NO_EXPORT SubscribeNamespaceOkMessage : public TestMessageBase { + public: + SubscribeNamespaceOkMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtSubscribeNamespaceOk>(values); + if (cast.track_namespace != subscribe_namespace_ok_.track_namespace) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE_OK track namespace mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(subscribe_namespace_ok_); + } + + private: + uint8_t raw_packet_[6] = { + 0x12, 0x01, 0x03, 0x66, 0x6f, 0x6f, // namespace = "foo" + }; + + MoqtSubscribeNamespaceOk subscribe_namespace_ok_ = { + /*track_namespace=*/FullTrackName{"foo"}, + }; +}; + +class QUICHE_NO_EXPORT SubscribeNamespaceErrorMessage : public TestMessageBase { + public: + SubscribeNamespaceErrorMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtSubscribeNamespaceError>(values); + if (cast.track_namespace != subscribe_namespace_error_.track_namespace) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE_ERROR track namespace mismatch"; + return false; + } + if (cast.error_code != subscribe_namespace_error_.error_code) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE_ERROR error code mismatch"; + return false; + } + if (cast.reason_phrase != subscribe_namespace_error_.reason_phrase) { + QUIC_LOG(INFO) << "SUBSCRIBE_NAMESPACE_ERROR reason phrase mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv---vv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(subscribe_namespace_error_); + } + + private: + uint8_t raw_packet_[11] = { + 0x13, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x01, // error_code = 1 + 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" + }; + + MoqtSubscribeNamespaceError subscribe_namespace_error_ = { + /*track_namespace=*/FullTrackName{"foo"}, + /*error_code=*/MoqtAnnounceErrorCode::kAnnounceNotSupported, + /*reason_phrase=*/"bar", + }; +}; + +class QUICHE_NO_EXPORT UnsubscribeNamespaceMessage : public TestMessageBase { + public: + UnsubscribeNamespaceMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtUnsubscribeNamespace>(values); + if (cast.track_namespace != unsubscribe_namespace_.track_namespace) { + QUIC_LOG(INFO) << "UNSUBSCRIBE_NAMESPACE track namespace mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(unsubscribe_namespace_); + } + + private: + uint8_t raw_packet_[6] = { + 0x14, 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace + }; + + MoqtUnsubscribeNamespace unsubscribe_namespace_ = { + /*track_namespace=*/FullTrackName{"foo"}, + }; +}; + class QUICHE_NO_EXPORT MaxSubscribeIdMessage : public TestMessageBase { public: MaxSubscribeIdMessage() : TestMessageBase() { @@ -1173,6 +1318,14 @@ return std::make_unique<TrackStatusMessage>(); case MoqtMessageType::kGoAway: return std::make_unique<GoAwayMessage>(); + case MoqtMessageType::kSubscribeNamespace: + return std::make_unique<SubscribeNamespaceMessage>(); + case MoqtMessageType::kSubscribeNamespaceOk: + return std::make_unique<SubscribeNamespaceOkMessage>(); + case MoqtMessageType::kSubscribeNamespaceError: + return std::make_unique<SubscribeNamespaceErrorMessage>(); + case MoqtMessageType::kUnsubscribeNamespace: + return std::make_unique<UnsubscribeNamespaceMessage>(); case MoqtMessageType::kMaxSubscribeId: return std::make_unique<MaxSubscribeIdMessage>(); case MoqtMessageType::kObjectAck: