|  | // Copyright (c) 2023 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "quiche/quic/moqt/moqt_parser.h" | 
|  |  | 
|  | #include <array> | 
|  | #include <cstddef> | 
|  | #include <cstdint> | 
|  | #include <cstring> | 
|  | #include <memory> | 
|  | #include <optional> | 
|  | #include <string> | 
|  | #include <variant> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/strings/str_join.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "quiche/quic/core/quic_data_writer.h" | 
|  | #include "quiche/quic/core/quic_time.h" | 
|  | #include "quiche/quic/moqt/moqt_messages.h" | 
|  | #include "quiche/quic/moqt/test_tools/moqt_test_message.h" | 
|  | #include "quiche/quic/platform/api/quic_test.h" | 
|  | #include "quiche/web_transport/test_tools/in_memory_stream.h" | 
|  |  | 
|  | namespace moqt::test { | 
|  |  | 
|  | namespace { | 
|  |  | 
|  | using ::testing::AnyOf; | 
|  | using ::testing::HasSubstr; | 
|  | using ::testing::Optional; | 
|  |  | 
|  | constexpr std::array kMessageTypes{ | 
|  | MoqtMessageType::kSubscribe, | 
|  | MoqtMessageType::kSubscribeOk, | 
|  | MoqtMessageType::kSubscribeError, | 
|  | MoqtMessageType::kSubscribeUpdate, | 
|  | MoqtMessageType::kUnsubscribe, | 
|  | MoqtMessageType::kSubscribeDone, | 
|  | MoqtMessageType::kAnnounceCancel, | 
|  | MoqtMessageType::kTrackStatusRequest, | 
|  | MoqtMessageType::kTrackStatus, | 
|  | MoqtMessageType::kAnnounce, | 
|  | MoqtMessageType::kAnnounceOk, | 
|  | MoqtMessageType::kAnnounceError, | 
|  | MoqtMessageType::kUnannounce, | 
|  | MoqtMessageType::kClientSetup, | 
|  | MoqtMessageType::kServerSetup, | 
|  | MoqtMessageType::kGoAway, | 
|  | MoqtMessageType::kSubscribeAnnounces, | 
|  | MoqtMessageType::kSubscribeAnnouncesOk, | 
|  | MoqtMessageType::kSubscribeAnnouncesError, | 
|  | MoqtMessageType::kUnsubscribeAnnounces, | 
|  | MoqtMessageType::kMaxRequestId, | 
|  | MoqtMessageType::kFetch, | 
|  | MoqtMessageType::kFetchCancel, | 
|  | MoqtMessageType::kFetchOk, | 
|  | MoqtMessageType::kFetchError, | 
|  | MoqtMessageType::kRequestsBlocked, | 
|  | MoqtMessageType::kObjectAck, | 
|  | }; | 
|  | constexpr std::array kDataStreamTypes{ | 
|  | MoqtDataStreamType::kStreamHeaderSubgroup, | 
|  | MoqtDataStreamType::kStreamHeaderFetch, | 
|  | }; | 
|  |  | 
|  | using GeneralizedMessageType = | 
|  | std::variant<MoqtMessageType, MoqtDataStreamType>; | 
|  | }  // namespace | 
|  |  | 
|  | struct MoqtParserTestParams { | 
|  | MoqtParserTestParams(MoqtMessageType message_type, bool uses_web_transport) | 
|  | : message_type(message_type), uses_web_transport(uses_web_transport) {} | 
|  | explicit MoqtParserTestParams(MoqtDataStreamType message_type) | 
|  | : message_type(message_type), uses_web_transport(true) {} | 
|  | GeneralizedMessageType message_type; | 
|  | bool uses_web_transport; | 
|  | }; | 
|  |  | 
|  | std::vector<MoqtParserTestParams> GetMoqtParserTestParams() { | 
|  | std::vector<MoqtParserTestParams> params; | 
|  |  | 
|  | for (MoqtMessageType message_type : kMessageTypes) { | 
|  | if (message_type == MoqtMessageType::kClientSetup) { | 
|  | for (const bool uses_web_transport : {false, true}) { | 
|  | params.push_back( | 
|  | MoqtParserTestParams(message_type, uses_web_transport)); | 
|  | } | 
|  | } else { | 
|  | // All other types are processed the same for either perspective or | 
|  | // transport. | 
|  | params.push_back(MoqtParserTestParams(message_type, true)); | 
|  | } | 
|  | } | 
|  | for (MoqtDataStreamType type : kDataStreamTypes) { | 
|  | params.push_back(MoqtParserTestParams(type)); | 
|  | } | 
|  | return params; | 
|  | } | 
|  |  | 
|  | std::string TypeFormatter(MoqtMessageType type) { | 
|  | return MoqtMessageTypeToString(type); | 
|  | } | 
|  | std::string TypeFormatter(MoqtDataStreamType type) { | 
|  | return MoqtDataStreamTypeToString(type); | 
|  | } | 
|  | std::string ParamNameFormatter( | 
|  | const testing::TestParamInfo<MoqtParserTestParams>& info) { | 
|  | return std::visit([](auto x) { return TypeFormatter(x); }, | 
|  | info.param.message_type) + | 
|  | "_" + (info.param.uses_web_transport ? "WebTransport" : "QUIC"); | 
|  | } | 
|  |  | 
|  | class MoqtParserTestVisitor : public MoqtControlParserVisitor, | 
|  | public MoqtDataParserVisitor { | 
|  | public: | 
|  | ~MoqtParserTestVisitor() = default; | 
|  |  | 
|  | void OnObjectMessage(const MoqtObject& message, absl::string_view payload, | 
|  | bool end_of_message) override { | 
|  | MoqtObject object = message; | 
|  | object_payloads_.push_back(std::string(payload)); | 
|  | end_of_message_ = end_of_message; | 
|  | if (end_of_message) { | 
|  | ++messages_received_; | 
|  | } | 
|  | last_message_ = TestMessageBase::MessageStructuredData(object); | 
|  | } | 
|  | template <typename Message> | 
|  | void OnControlMessage(const Message& message) { | 
|  | end_of_message_ = true; | 
|  | ++messages_received_; | 
|  | last_message_ = TestMessageBase::MessageStructuredData(message); | 
|  | } | 
|  | void OnClientSetupMessage(const MoqtClientSetup& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnServerSetupMessage(const MoqtServerSetup& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeMessage(const MoqtSubscribe& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeDoneMessage(const MoqtSubscribeDone& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnAnnounceMessage(const MoqtAnnounce& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnAnnounceOkMessage(const MoqtAnnounceOk& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnAnnounceErrorMessage(const MoqtAnnounceError& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnAnnounceCancelMessage(const MoqtAnnounceCancel& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnTrackStatusRequestMessage( | 
|  | const MoqtTrackStatusRequest& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnUnannounceMessage(const MoqtUnannounce& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnTrackStatusMessage(const MoqtTrackStatus& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnGoAwayMessage(const MoqtGoAway& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeAnnouncesMessage( | 
|  | const MoqtSubscribeAnnounces& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeAnnouncesOkMessage( | 
|  | const MoqtSubscribeAnnouncesOk& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnSubscribeAnnouncesErrorMessage( | 
|  | const MoqtSubscribeAnnouncesError& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnUnsubscribeAnnouncesMessage( | 
|  | const MoqtUnsubscribeAnnounces& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnMaxRequestIdMessage(const MoqtMaxRequestId& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnFetchMessage(const MoqtFetch& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnFetchCancelMessage(const MoqtFetchCancel& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnFetchOkMessage(const MoqtFetchOk& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnFetchErrorMessage(const MoqtFetchError& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnObjectAckMessage(const MoqtObjectAck& message) override { | 
|  | OnControlMessage(message); | 
|  | } | 
|  | void OnParsingError(MoqtError code, absl::string_view reason) override { | 
|  | QUIC_LOG(INFO) << "Parsing error: " << reason; | 
|  | parsing_error_ = reason; | 
|  | parsing_error_code_ = code; | 
|  | } | 
|  |  | 
|  | std::string object_payload() { return absl::StrJoin(object_payloads_, ""); } | 
|  |  | 
|  | std::vector<std::string> object_payloads_; | 
|  | bool end_of_message_ = false; | 
|  | std::optional<std::string> parsing_error_; | 
|  | MoqtError parsing_error_code_; | 
|  | uint64_t messages_received_ = 0; | 
|  | std::optional<TestMessageBase::MessageStructuredData> last_message_; | 
|  | }; | 
|  |  | 
|  | class MoqtParserTest | 
|  | : public quic::test::QuicTestWithParam<MoqtParserTestParams> { | 
|  | public: | 
|  | MoqtParserTest() | 
|  | : message_type_(GetParam().message_type), | 
|  | webtrans_(GetParam().uses_web_transport), | 
|  | control_stream_(/*stream_id=*/0), | 
|  | control_parser_(GetParam().uses_web_transport, &control_stream_, | 
|  | visitor_), | 
|  | data_stream_(/*stream_id=*/0), | 
|  | data_parser_(&data_stream_, &visitor_) {} | 
|  |  | 
|  | bool IsDataStream() { | 
|  | return std::holds_alternative<MoqtDataStreamType>(message_type_); | 
|  | } | 
|  |  | 
|  | std::unique_ptr<TestMessageBase> MakeMessage() { | 
|  | if (IsDataStream()) { | 
|  | return CreateTestDataStream(std::get<MoqtDataStreamType>(message_type_)); | 
|  | } else { | 
|  | return CreateTestMessage(std::get<MoqtMessageType>(message_type_), | 
|  | webtrans_); | 
|  | } | 
|  | } | 
|  |  | 
|  | void ProcessData(absl::string_view data, bool fin) { | 
|  | if (IsDataStream()) { | 
|  | data_stream_.Receive(data, fin); | 
|  | data_parser_.ReadAllData(); | 
|  | } else { | 
|  | control_stream_.Receive(data, /*fin=*/false); | 
|  | control_parser_.ReadAndDispatchMessages(); | 
|  | } | 
|  | } | 
|  |  | 
|  | protected: | 
|  | MoqtParserTestVisitor visitor_; | 
|  | GeneralizedMessageType message_type_; | 
|  | bool webtrans_; | 
|  | webtransport::test::InMemoryStream control_stream_; | 
|  | MoqtControlParser control_parser_; | 
|  | webtransport::test::InMemoryStream data_stream_; | 
|  | MoqtDataParser data_parser_; | 
|  | }; | 
|  |  | 
|  | INSTANTIATE_TEST_SUITE_P(MoqtParserTests, MoqtParserTest, | 
|  | testing::ValuesIn(GetMoqtParserTestParams()), | 
|  | ParamNameFormatter); | 
|  |  | 
|  | TEST_P(MoqtParserTest, OneMessage) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | ProcessData(message->PacketSample(), true); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, OneMessageWithLongVarints) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | message->ExpandVarints(); | 
|  | ProcessData(message->PacketSample(), false); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, TwoPartMessage) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | // The test Object message has payload for less then half the message length, | 
|  | // so splitting the message in half will prevent the first half from being | 
|  | // processed. | 
|  | size_t first_data_size = message->total_message_size() / 2; | 
|  | ProcessData(message->PacketSample().substr(0, first_data_size), false); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | ProcessData( | 
|  | message->PacketSample().substr( | 
|  | first_data_size, message->total_message_size() - first_data_size), | 
|  | true); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, OneByteAtATime) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | for (size_t i = 0; i < message->total_message_size(); ++i) { | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | bool last = i == (message->total_message_size() - 1); | 
|  | ProcessData(message->PacketSample().substr(i, 1), last); | 
|  | } | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, OneByteAtATimeLongerVarints) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | message->ExpandVarints(); | 
|  | for (size_t i = 0; i < message->total_message_size(); ++i) { | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | bool last = i == (message->total_message_size() - 1); | 
|  | ProcessData(message->PacketSample().substr(i, 1), last); | 
|  | } | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, TwoBytesAtATime) { | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | for (size_t i = 0; i < message->total_message_size(); i += 3) { | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | bool last = (i + 2) >= message->total_message_size(); | 
|  | ProcessData(message->PacketSample().substr(i, 3), last); | 
|  | } | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | if (IsDataStream()) { | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | } | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, EarlyFin) { | 
|  | if (!IsDataStream()) { | 
|  | return; | 
|  | } | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | size_t first_data_size = message->total_message_size() - 1; | 
|  | ProcessData(message->PacketSample().substr(0, first_data_size), true); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_THAT(visitor_.parsing_error_, | 
|  | AnyOf("FIN after incomplete message", | 
|  | "FIN received at an unexpected point in the stream")); | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, SeparateEarlyFin) { | 
|  | if (!IsDataStream()) { | 
|  | return; | 
|  | } | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | size_t first_data_size = message->total_message_size() - 1; | 
|  | ProcessData(message->PacketSample().substr(0, first_data_size), false); | 
|  | ProcessData(absl::string_view(), true); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_THAT(visitor_.parsing_error_, | 
|  | AnyOf("End of stream before complete message", | 
|  | "FIN received at an unexpected point in the stream")); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, PayloadLengthTooLong) { | 
|  | if (IsDataStream()) { | 
|  | return; | 
|  | } | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | message->IncreasePayloadLengthByOne(); | 
|  | ProcessData(message->PacketSample(), false); | 
|  | // The parser will actually report a message, because it's all there. | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Message length does not match payload length"); | 
|  | } | 
|  |  | 
|  | TEST_P(MoqtParserTest, PayloadLengthTooShort) { | 
|  | if (IsDataStream()) { | 
|  | return; | 
|  | } | 
|  | std::unique_ptr<TestMessageBase> message = MakeMessage(); | 
|  | message->DecreasePayloadLengthByOne(); | 
|  | ProcessData(message->PacketSample(), false); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Message length does not match payload length"); | 
|  | } | 
|  |  | 
|  | // Tests for message-specific error cases, and behaviors for a single message | 
|  | // type. | 
|  | class MoqtMessageSpecificTest : public quic::test::QuicTest { | 
|  | public: | 
|  | MoqtMessageSpecificTest() {} | 
|  |  | 
|  | MoqtParserTestVisitor visitor_; | 
|  |  | 
|  | static constexpr bool kWebTrans = true; | 
|  | static constexpr bool kRawQuic = false; | 
|  | }; | 
|  |  | 
|  | // Send the header + some payload, pure payload, then pure payload to end the | 
|  | // message. | 
|  | TEST_F(MoqtMessageSpecificTest, ThreePartObject) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | auto message = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  | EXPECT_TRUE(message->SetPayloadLength(14)); | 
|  | stream.Receive(message->PacketSample(), false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  |  | 
|  | // second part | 
|  | stream.Receive("bar", false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.object_payload(), "foobar"); | 
|  |  | 
|  | // third part includes FIN | 
|  | stream.Receive("deadbeef", true); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.object_payload(), "foobardeadbeef"); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | } | 
|  |  | 
|  | // Send the part of header, rest of header + payload, plus payload. | 
|  | TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | auto message = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  | EXPECT_TRUE(message->SetPayloadLength(51)); | 
|  |  | 
|  | // first part | 
|  | stream.Receive(message->PacketSample().substr(0, 4), false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  |  | 
|  | // second part. Add padding to it. | 
|  | message->set_wire_image_size(63); | 
|  | stream.Receive( | 
|  | message->PacketSample().substr(4, message->total_message_size() - 4), | 
|  | false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_FALSE(visitor_.end_of_message_); | 
|  | // The value "48" is the overall wire image size of 63 minus the non-payload | 
|  | // part of the message. | 
|  | EXPECT_EQ(visitor_.object_payload().length(), 48); | 
|  |  | 
|  | // third part includes FIN | 
|  | stream.Receive("bar", true); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(*visitor_.object_payloads_.crbegin(), "bar"); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, ObjectSplitInExtension) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | auto message = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  |  | 
|  | // first part | 
|  | stream.Receive(message->PacketSample().substr(0, 10), false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  |  | 
|  | // second part | 
|  | stream.Receive( | 
|  | message->PacketSample().substr(10, sizeof(message->total_message_size())), | 
|  | false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(visitor_.last_message_.has_value() && | 
|  | message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, StreamHeaderSubgroupFollowOn) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | // first part | 
|  | auto message1 = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  | stream.Receive(message1->PacketSample(), false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.object_payload(), "foo"); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | // second part | 
|  | visitor_.object_payloads_.clear(); | 
|  | auto message2 = std::make_unique<StreamMiddlerSubgroupMessage>(); | 
|  | stream.Receive(message2->PacketSample(), false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 2); | 
|  | EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_TRUE(visitor_.end_of_message_); | 
|  | EXPECT_EQ(visitor_.object_payload(), "bar"); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, ClientSetupMaxRequestIdAppearsTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x20, 0x00, 0x0d, 0x02, 0x01, 0x02,  // versions | 
|  | 0x03,                                // 3 params | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo" | 
|  | 0x02, 0x32,                          // max_request_id = 50 | 
|  | 0x02, 0x32,                          // max_request_id = 50 | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Client SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SetupPathFromServer) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x21, 0x00, 0x07, | 
|  | 0x01,                          // version = 1 | 
|  | 0x01,                          // 1 param | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // path = "foo" | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Server SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kInvalidPath); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SetupPathAppearsTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x20, 0x00, 0x0e, 0x02, 0x01, 0x02,  // versions = 1, 2 | 
|  | 0x02,                                // 2 params | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo" | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo" | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Client SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SetupPathOverWebtrans) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x20, 0x00, 0x09, 0x02, 0x01, 0x02,  // versions = 1, 2 | 
|  | 0x01,                                // 1 param | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo" | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Client SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kInvalidPath); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SetupPathMissing) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x20, 0x00, 0x04, 0x02, 0x01, 0x02,  // versions = 1, 2 | 
|  | 0x00,                                // no param | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Client SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kInvalidPath); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, ServerSetupMaxRequestIdAppearsTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char setup[] = { | 
|  | 0x20, 0x00, 0x0d, 0x02, 0x01, 0x02,  // versions = 1, 2 | 
|  | 0x03,                                // 4 params | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo" | 
|  | 0x02, 0x32,                          // max_request_id = 50 | 
|  | 0x02, 0x32,                          // max_request_id = 50 | 
|  | }; | 
|  | stream.Receive(absl::string_view(setup, sizeof(setup)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Client SETUP contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, UnknownParameterTwiceIsOk) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x1b, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x02,                          // two params | 
|  | 0x1f, 0x03, 0x62, 0x61, 0x72,  // 0x1f = "bar" | 
|  | 0x1f, 0x03, 0x62, 0x61, 0x72,  // 0x1f = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeDeliveryTimeoutTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x17, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x02,                          // two params | 
|  | 0x02, 0x67, 0x10,              // delivery_timeout = 10000 | 
|  | 0x02, 0x67, 0x10,              // delivery_timeout = 10000 | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeMaxCacheDurationTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x17, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x02,                          // two params | 
|  | 0x04, 0x67, 0x10,              // max_cache_duration = 10000 | 
|  | 0x04, 0x67, 0x10,              // max_cache_duration = 10000 | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "SUBSCRIBE contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationTokenTagDelete) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x15, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x01,                          // one param | 
|  | 0x01, 0x02, 0x00, 0x00,        // authorization_token = DELETE 0; | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Unknown Auth Token Alias"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationTokenTagRegister) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x19, 0x01, 0x02, 0x01, 0x03, 0x66, | 
|  | 0x6f, 0x6f,                    // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x01,                          // one param | 
|  | 0x01, 0x06, 0x01, 0x10, 0x00, 0x62, 0x61, 0x72,  // REGISTER 0x01 | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Too many authorization token tags"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kAuthTokenCacheOverflow); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationTokenTagUseAlias) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x15, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x01,                          // one param | 
|  | 0x01, 0x02, 0x02, 0x07,        // authorization_token = USE 7; | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Unknown Auth Token Alias"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, | 
|  | SubscribeAuthorizationTokenTagUnknownAliasType) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x15, 0x01, 0x02, | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x01,                          // one param | 
|  | 0x01, 0x02, 0x04, 0x07,        // authorization_token type 4 | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid Authorization Token Alias type"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, | 
|  | SubscribeAuthorizationTokenTagUnknownTokenType) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x17, 0x01, 0x02, 0x01, | 
|  | 0x03, 0x66, 0x6f, 0x6f,             // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,       // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,                   // priority, order, forward | 
|  | 0x02,                               // filter_type = kLatestObject | 
|  | 0x01,                               // one param | 
|  | 0x01, 0x04, 0x03, 0x01, 0x00, 0x00  // authorization_token type 1 | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid Authorization Token Type"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeInvalidGroupOrder) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20,                          // subscriber priority = 0x20 | 
|  | 0x03,                          // group order = invalid | 
|  | 0x01,                          // forward = true | 
|  | 0x03,                          // Filter type: Absolute Start | 
|  | 0x04,                          // start_group = 4 (relative previous) | 
|  | 0x01,                          // start_object = 1 (absolute) | 
|  | // No EndGroup or EndObject | 
|  | 0x02,                                      // 2 parameters | 
|  | 0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeInvalidForward) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20,                          // subscriber priority = 0x20 | 
|  | 0x02,                          // group order = descending | 
|  | 0x02,                          // forward = invalid | 
|  | 0x03,                          // Filter type: Absolute Start | 
|  | 0x04,                          // start_group = 4 (relative previous) | 
|  | 0x01,                          // start_object = 1 (absolute) | 
|  | // No EndGroup or EndObject | 
|  | 0x02,                                      // 2 parameters | 
|  | 0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in SUBSCRIBE"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeInvalidFilter) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20,                          // subscriber priority = 0x20 | 
|  | 0x02,                          // group order = descending | 
|  | 0x01,                          // forward = true | 
|  | 0x05,                          // Filter type: Absolute Start | 
|  | 0x04,                          // start_group = 4 (relative previous) | 
|  | 0x01,                          // start_object = 1 (absolute) | 
|  | // No EndGroup or EndObject | 
|  | 0x02,                                      // 2 parameters | 
|  | 0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid filter type"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationToken) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char subscribe_ok[] = { | 
|  | 0x04, 0x00, 0x11, 0x01, 0x03,  // subscribe_id = 1, expires = 3 | 
|  | 0x02, 0x01,                    // group_order = 2, content exists | 
|  | 0x0c, 0x14,        // largest_group_id = 12, largest_object_id = 20, | 
|  | 0x02,              // 2 parameters | 
|  | 0x02, 0x67, 0x10,  // delivery_timeout = 10000 | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_token = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe_ok, sizeof(subscribe_ok)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "SUBSCRIBE_OK contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeUpdateHasAuthorizationToken) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char subscribe_update[] = { | 
|  | 0x02, 0x00, 0x0e, 0x02, 0x03, 0x01, 0x05,  // start and end sequences | 
|  | 0xaa, 0x01,                                // priority, forward | 
|  | 0x01,                                      // 1 parameter | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_token = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), | 
|  | false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "SUBSCRIBE_UPDATE contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationTokenTwice) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char announce[] = { | 
|  | 0x06, 0x00, 0x14, 0x01, 0x03, 0x66, 0x6f, | 
|  | 0x6f,                                      // track_namespace = "foo" | 
|  | 0x02,                                      // 2 params | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization = "bar" | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(announce, sizeof(announce)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AnnounceHasDeliveryTimeout) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kWebTrans, &stream, visitor_); | 
|  | char announce[] = { | 
|  | 0x06, 0x00, 0x10, 0x01, 0x03, 0x66, 0x6f, | 
|  | 0x6f,                                      // track_namespace = "foo" | 
|  | 0x02,                                      // 2 params | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_info = "bar" | 
|  | 0x02, 0x67, 0x10,                          // delivery_timeout = 10000 | 
|  | }; | 
|  | stream.Receive(absl::string_view(announce, sizeof(announce)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "ANNOUNCE contains invalid parameters"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, FinMidPayload) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | auto message = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  | stream.Receive( | 
|  | message->PacketSample().substr(0, message->total_message_size() - 1), | 
|  | true); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "FIN received at an unexpected point in the stream"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, PartialPayloadThenFin) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | auto message = std::make_unique<StreamHeaderSubgroupMessage>(); | 
|  | stream.Receive( | 
|  | message->PacketSample().substr(0, message->total_message_size() - 1), | 
|  | false); | 
|  | parser.ReadAllData(); | 
|  | stream.Receive(absl::string_view(), true); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "FIN received at an unexpected point in the stream"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, ControlStreamFin) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | stream.Receive(absl::string_view(), true);  // Find FIN | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "FIN on control stream"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | char stream_header_subgroup[] = { | 
|  | 0x04,                    // type field | 
|  | 0x04, 0x05, 0x08,        // varints | 
|  | 0x07,                    // publisher priority | 
|  | 0x06, 0x00, 0x00, 0x0f,  // object middler; status = 0x0f | 
|  | }; | 
|  | stream.Receive( | 
|  | absl::string_view(stream_header_subgroup, sizeof(stream_header_subgroup)), | 
|  | false); | 
|  | parser.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid object status provided"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, Setup2KB) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char big_message[2 * kMaxMessageHeaderSize]; | 
|  | quic::QuicDataWriter writer(sizeof(big_message), big_message); | 
|  | writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kServerSetup)); | 
|  | writer.WriteUInt16(8 + kMaxMessageHeaderSize); | 
|  | writer.WriteVarInt62(0x1);                    // version | 
|  | writer.WriteVarInt62(0x1);                    // num_params | 
|  | writer.WriteVarInt62(0xbeef);                 // unknown param | 
|  | writer.WriteVarInt62(kMaxMessageHeaderSize);  // very long parameter | 
|  | writer.WriteRepeatedByte(0x04, kMaxMessageHeaderSize); | 
|  | // Send incomplete message | 
|  | stream.Receive(absl::string_view(big_message, writer.length() - 1), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Cannot parse control messages more than 2048 bytes"); | 
|  | EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kInternalError); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, UnknownMessageType) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char message[7]; | 
|  | quic::QuicDataWriter writer(sizeof(message), message); | 
|  | writer.WriteVarInt62(0xbeef);  // unknown message type | 
|  | writer.WriteUInt16(0x1);       // length | 
|  | writer.WriteVarInt62(0x1);     // payload | 
|  | stream.Receive(absl::string_view(message, writer.length()), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Unknown message type"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, LatestObject) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority = 0x20, group order, forward | 
|  | 0x02,                          // filter_type = kLatestObject | 
|  | 0x01,                          // 1 parameter | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | MoqtSubscribe message = | 
|  | std::get<MoqtSubscribe>(visitor_.last_message_.value()); | 
|  | EXPECT_FALSE(message.start.has_value()); | 
|  | EXPECT_FALSE(message.end_group.has_value()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, InvalidDeliveryOrder) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x08, 0x01,              // priority, invalid order, forward | 
|  | 0x01,                          // filter_type = kNextGroupStart | 
|  | 0x01,                          // 1 parameter | 
|  | 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AbsoluteStart) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x03,                          // filter_type = kAbsoluteStart | 
|  | 0x04,                          // start_group = 4 | 
|  | 0x01,                          // start_object = 1 | 
|  | 0x01,                          // 1 parameter | 
|  | 0x03, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | MoqtSubscribe message = | 
|  | std::get<MoqtSubscribe>(visitor_.last_message_.value()); | 
|  | EXPECT_TRUE(message.start.has_value() && message.start->group == 4); | 
|  | EXPECT_TRUE(message.start.has_value() && message.start->object == 1); | 
|  | EXPECT_FALSE(message.end_group.has_value()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AbsoluteRange) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x19, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x04,                          // filter_type = kAbsoluteRange | 
|  | 0x04,                          // start_group = 4 | 
|  | 0x01,                          // start_object = 1 | 
|  | 0x07,                          // end_group = 7 | 
|  | 0x01,                          // 1 parameter | 
|  | 0x03, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | MoqtSubscribe message = | 
|  | std::get<MoqtSubscribe>(visitor_.last_message_.value()); | 
|  | EXPECT_TRUE(message.start.has_value() && message.start->group == 4); | 
|  | EXPECT_TRUE(message.start.has_value() && message.start->object == 1); | 
|  | EXPECT_EQ(message.end_group.value(), 7); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AbsoluteRangeEndGroupTooLow) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x19, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x04,                          // filter_type = kAbsoluteRange | 
|  | 0x04,                          // start_group = 4 | 
|  | 0x01,                          // start_object = 1 | 
|  | 0x03,                          // end_group = 3 | 
|  | 0x01,                          // 1 parameter | 
|  | 0x03, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar" | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AbsoluteRangeExactlyOneObject) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe[] = { | 
|  | 0x03, 0x00, 0x14, 0x01, 0x02,  // id and alias | 
|  | 0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo" | 
|  | 0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd" | 
|  | 0x20, 0x02, 0x01,              // priority, order, forward | 
|  | 0x04,                          // filter_type = kAbsoluteRange | 
|  | 0x04,                          // start_group = 4 | 
|  | 0x01,                          // start_object = 1 | 
|  | 0x04,                          // end_group = 4 | 
|  | 0x00,                          // no parameters | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeUpdateExactlyOneObject) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe_update[] = { | 
|  | 0x02, 0x00, 0x07, 0x02, 0x03, 0x01, 0x04,  // start and end sequences | 
|  | 0x20, 0x01,                                // priority, forward | 
|  | 0x00,                                      // No parameters | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), | 
|  | false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeUpdateEndGroupTooLow) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char subscribe_update[] = { | 
|  | 0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x03,  // start and end sequences | 
|  | 0x20, 0x01,                                // priority, forward | 
|  | 0x01,                                      // 1 parameter | 
|  | 0x02, 0x20,                                // delivery_timeout = 32 ms | 
|  | }; | 
|  | stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), | 
|  | false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, ObjectAckNegativeDelta) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char object_ack[] = { | 
|  | 0x71, 0x84, 0x00, 0x05,  // type | 
|  | 0x01, 0x10, 0x20,        // subscribe ID, group, object | 
|  | 0x40, 0x81,              // -0x40 time delta | 
|  | }; | 
|  | stream.Receive(absl::string_view(object_ack, sizeof(object_ack)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | MoqtObjectAck message = | 
|  | std::get<MoqtObjectAck>(visitor_.last_message_.value()); | 
|  | EXPECT_EQ(message.subscribe_id, 0x01); | 
|  | EXPECT_EQ(message.group_id, 0x10); | 
|  | EXPECT_EQ(message.object_id, 0x20); | 
|  | EXPECT_EQ(message.delta_from_deadline, | 
|  | quic::QuicTimeDelta::FromMicroseconds(-0x40)); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, AllMessagesTogether) { | 
|  | char buffer[5000]; | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | size_t write = 0; | 
|  | size_t read = 0; | 
|  | int fully_received = 0; | 
|  | std::unique_ptr<TestMessageBase> prev_message = nullptr; | 
|  | for (MoqtMessageType type : kMessageTypes) { | 
|  | // Each iteration, process from the halfway point of one message to the | 
|  | // halfway point of the next. | 
|  | std::unique_ptr<TestMessageBase> message = | 
|  | CreateTestMessage(type, kRawQuic); | 
|  | memcpy(buffer + write, message->PacketSample().data(), | 
|  | message->total_message_size()); | 
|  | size_t new_read = write + message->total_message_size() / 2; | 
|  | stream.Receive(absl::string_view(buffer + read, new_read - read), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | ASSERT_EQ(visitor_.messages_received_, fully_received); | 
|  | if (prev_message != nullptr) { | 
|  | EXPECT_TRUE(prev_message->EqualFieldValues(*visitor_.last_message_)); | 
|  | } | 
|  | fully_received++; | 
|  | read = new_read; | 
|  | write += message->total_message_size(); | 
|  | prev_message = std::move(message); | 
|  | } | 
|  | // Deliver the rest | 
|  | stream.Receive(absl::string_view(buffer + read, write - read), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, fully_received); | 
|  | EXPECT_TRUE(prev_message->EqualFieldValues(*visitor_.last_message_)); | 
|  | EXPECT_FALSE(visitor_.parsing_error_.has_value()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, DatagramSuccessful) { | 
|  | ObjectDatagramMessage message; | 
|  | MoqtObject object; | 
|  | std::optional<absl::string_view> payload = | 
|  | ParseDatagram(message.PacketSample(), object); | 
|  | ASSERT_TRUE(payload.has_value()); | 
|  | TestMessageBase::MessageStructuredData object_metadata = | 
|  | TestMessageBase::MessageStructuredData(object); | 
|  | EXPECT_TRUE(message.EqualFieldValues(object_metadata)); | 
|  | EXPECT_EQ(payload, "foo"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, DatagramStatusSuccessful) { | 
|  | ObjectStatusDatagramMessage message; | 
|  | MoqtObject object; | 
|  | std::optional<absl::string_view> payload = | 
|  | ParseDatagram(message.PacketSample(), object); | 
|  | ASSERT_TRUE(payload.has_value()); | 
|  | TestMessageBase::MessageStructuredData object_metadata = | 
|  | TestMessageBase::MessageStructuredData(object); | 
|  | EXPECT_TRUE(message.EqualFieldValues(object_metadata)); | 
|  | EXPECT_TRUE(payload.has_value() && payload->empty()); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) { | 
|  | StreamHeaderSubgroupMessage message; | 
|  | MoqtObject object; | 
|  | std::optional<absl::string_view> payload = | 
|  | ParseDatagram(message.PacketSample(), object); | 
|  | EXPECT_EQ(payload, std::nullopt); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, TruncatedDatagram) { | 
|  | ObjectDatagramMessage message; | 
|  | message.set_wire_image_size(4); | 
|  | MoqtObject object; | 
|  | std::optional<absl::string_view> payload = | 
|  | ParseDatagram(message.PacketSample(), object); | 
|  | EXPECT_EQ(payload, std::nullopt); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, VeryTruncatedDatagram) { | 
|  | char message = 0x40; | 
|  | MoqtObject object; | 
|  | std::optional<absl::string_view> payload = | 
|  | ParseDatagram(absl::string_view(&message, sizeof(message)), object); | 
|  | EXPECT_EQ(payload, std::nullopt); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidContentExists) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | SubscribeOkMessage subscribe_ok; | 
|  | subscribe_ok.SetInvalidContentExists(); | 
|  | stream.Receive(subscribe_ok.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "SUBSCRIBE_OK ContentExists has invalid value"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidDeliveryOrder) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | SubscribeOkMessage subscribe_ok; | 
|  | subscribe_ok.SetInvalidDeliveryOrder(); | 
|  | stream.Receive(subscribe_ok.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Invalid group order value in SUBSCRIBE_OK"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | FetchMessage fetch; | 
|  | fetch.SetEndObject(1, 1); | 
|  | stream.Receive(fetch.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "End object comes before start object in FETCH"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, FetchInvalidRange2) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | FetchMessage fetch; | 
|  | fetch.SetEndObject(0, std::nullopt); | 
|  | stream.Receive(fetch.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "End object comes before start object in FETCH"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | FetchMessage fetch; | 
|  | fetch.SetGroupOrder(3); | 
|  | stream.Receive(fetch.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(visitor_.parsing_error_, | 
|  | "Invalid group order value in FETCH message"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, PaddingStream) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtDataParser parser(&stream, &visitor_); | 
|  | std::string buffer(32, '\0'); | 
|  | quic::QuicDataWriter writer(buffer.size(), buffer.data()); | 
|  | ASSERT_TRUE(writer.WriteVarInt62( | 
|  | static_cast<uint64_t>(MoqtDataStreamType::kPadding))); | 
|  | for (int i = 0; i < 100; ++i) { | 
|  | stream.Receive(buffer, false); | 
|  | parser.ReadAllData(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 0); | 
|  | ASSERT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | } | 
|  | } | 
|  |  | 
|  | // All messages with TrackNamespace use ReadTrackNamespace too check this. Use | 
|  | // ANNOUNCE_OK for the test because it's small. | 
|  | TEST_F(MoqtMessageSpecificTest, NamespaceTooSmall) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char announce_ok[] = { | 
|  | 0x07, 0x00, 0x03,  // type, length | 
|  | 0x01, 0x01, 'a',   // 1 namespace element | 
|  | }; | 
|  | stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | announce_ok[2] -= 2;  // Remove one element. | 
|  | announce_ok[3] = 0x00; | 
|  | stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok) - 2), | 
|  | false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, NamespaceTooLarge) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | char announce_ok[70] = { | 
|  | 0x07, 0x00, 0x41,  // type, length = 65 | 
|  | 0x20,              // 32 namespace elements. This is the maximum. | 
|  | }; | 
|  | for (size_t i = 4; i < sizeof(announce_ok); i = i + 2) { | 
|  | announce_ok[i] = 0x01; | 
|  | announce_ok[i + 1] = 'a' + i; | 
|  | } | 
|  | stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok) - 2), | 
|  | false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | announce_ok[2] += 2;  // Add one element. | 
|  | ++announce_ok[3]; | 
|  | stream.Receive(absl::string_view(announce_ok, sizeof(announce_ok)), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, "Invalid number of namespace elements"); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtMessageSpecificTest, JoiningFetch) { | 
|  | webtransport::test::InMemoryStream stream(/*stream_id=*/0); | 
|  | MoqtControlParser parser(kRawQuic, &stream, visitor_); | 
|  | JoiningFetchMessage message; | 
|  | stream.Receive(message.PacketSample(), false); | 
|  | parser.ReadAndDispatchMessages(); | 
|  | EXPECT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | EXPECT_TRUE(visitor_.last_message_.has_value() && | 
|  | message.EqualFieldValues(*visitor_.last_message_)); | 
|  | } | 
|  |  | 
|  | class MoqtDataParserStateMachineTest : public quic::test::QuicTest { | 
|  | protected: | 
|  | MoqtDataParserStateMachineTest() | 
|  | : stream_(/*stream_id=*/0), parser_(&stream_, &visitor_) {} | 
|  |  | 
|  | webtransport::test::InMemoryStream stream_; | 
|  | MoqtParserTestVisitor visitor_; | 
|  | MoqtDataParser parser_; | 
|  | }; | 
|  |  | 
|  | TEST_F(MoqtDataParserStateMachineTest, ReadAll) { | 
|  | stream_.Receive(StreamHeaderSubgroupMessage().PacketSample()); | 
|  | stream_.Receive(StreamMiddlerSubgroupMessage().PacketSample()); | 
|  | parser_.ReadAllData(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 2); | 
|  | EXPECT_EQ(visitor_.object_payloads_[0], "foo"); | 
|  | EXPECT_EQ(visitor_.object_payloads_[1], "bar"); | 
|  | stream_.Receive("", /*fin=*/true); | 
|  | parser_.ReadAllData(); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtDataParserStateMachineTest, ReadObjects) { | 
|  | stream_.Receive(StreamHeaderSubgroupMessage().PacketSample()); | 
|  | stream_.Receive(StreamMiddlerSubgroupMessage().PacketSample(), /*fin=*/true); | 
|  | parser_.ReadAtMostOneObject(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.object_payloads_[0], "foo"); | 
|  | parser_.ReadAtMostOneObject(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 2); | 
|  | EXPECT_EQ(visitor_.object_payloads_[1], "bar"); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | } | 
|  |  | 
|  | TEST_F(MoqtDataParserStateMachineTest, ReadTypeThenObjects) { | 
|  | stream_.Receive(StreamHeaderSubgroupMessage().PacketSample()); | 
|  | stream_.Receive(StreamMiddlerSubgroupMessage().PacketSample(), /*fin=*/true); | 
|  | parser_.ReadStreamType(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 0); | 
|  | EXPECT_EQ(parser_.stream_type(), MoqtDataStreamType::kStreamHeaderSubgroup); | 
|  | parser_.ReadAtMostOneObject(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 1); | 
|  | EXPECT_EQ(visitor_.object_payloads_[0], "foo"); | 
|  | parser_.ReadAtMostOneObject(); | 
|  | ASSERT_EQ(visitor_.messages_received_, 2); | 
|  | EXPECT_EQ(visitor_.object_payloads_[1], "bar"); | 
|  | EXPECT_EQ(visitor_.parsing_error_, std::nullopt); | 
|  | } | 
|  |  | 
|  | }  // namespace moqt::test |