Framer and Parser for MoQT FETCH family messages in draft-07. PiperOrigin-RevId: 689397465
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index d2d34f8..f15f380 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -649,6 +649,50 @@ WireVarInt62(message.max_subscribe_id)); } +quiche::QuicheBuffer MoqtFramer::SerializeFetch(const MoqtFetch& message) { + if (message.end_group < message.start_object.group || + (message.end_group == message.start_object.group && + message.end_object.has_value() && + *message.end_object < message.start_object.object)) { + QUICHE_BUG(MoqtFramer_invalid_fetch) << "Invalid FETCH object range"; + return quiche::QuicheBuffer(); + } + return SerializeControlMessage( + MoqtMessageType::kFetch, WireVarInt62(message.subscribe_id), + WireFullTrackName(message.full_track_name, true), + WireUint8(message.subscriber_priority), + WireDeliveryOrder(message.group_order), + WireVarInt62(message.start_object.group), + WireVarInt62(message.start_object.object), + WireVarInt62(message.end_group), + WireVarInt62(message.end_object.has_value() ? *message.end_object + 1 + : 0), + WireSubscribeParameterList(message.parameters)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel( + const MoqtFetchCancel& message) { + return SerializeControlMessage(MoqtMessageType::kFetchCancel, + WireVarInt62(message.subscribe_id)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) { + return SerializeControlMessage( + MoqtMessageType::kFetchOk, WireVarInt62(message.subscribe_id), + WireDeliveryOrder(message.group_order), + WireVarInt62(message.largest_id.group), + WireVarInt62(message.largest_id.object), + WireSubscribeParameterList(message.parameters)); +} + +quiche::QuicheBuffer MoqtFramer::SerializeFetchError( + const MoqtFetchError& message) { + return SerializeControlMessage( + MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id), + WireVarInt62(message.error_code), + WireStringWithVarInt62Length(message.reason_phrase)); +} + quiche::QuicheBuffer MoqtFramer::SerializeObjectAck( const MoqtObjectAck& message) { return SerializeControlMessage(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index 6bf9454..ae931a9 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -65,6 +65,10 @@ const MoqtUnsubscribeAnnounces& message); quiche::QuicheBuffer SerializeMaxSubscribeId( const MoqtMaxSubscribeId& message); + quiche::QuicheBuffer SerializeFetch(const MoqtFetch& message); + quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message); + quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message); + quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message); quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message); private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index e8a8cce..e73142f 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -14,6 +14,7 @@ #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/test_tools/moqt_test_message.h" #include "quiche/quic/platform/api/quic_expect_bug.h" #include "quiche/quic/platform/api/quic_test.h" @@ -51,6 +52,10 @@ MoqtMessageType::kSubscribeAnnouncesError, MoqtMessageType::kUnsubscribeAnnounces, MoqtMessageType::kMaxSubscribeId, + MoqtMessageType::kFetch, + MoqtMessageType::kFetchCancel, + MoqtMessageType::kFetchOk, + MoqtMessageType::kFetchError, MoqtMessageType::kObjectAck, MoqtMessageType::kClientSetup, MoqtMessageType::kServerSetup, @@ -182,6 +187,22 @@ auto data = std::get<MoqtMaxSubscribeId>(structured_data); return framer_.SerializeMaxSubscribeId(data); } + case moqt::MoqtMessageType::kFetch: { + auto data = std::get<MoqtFetch>(structured_data); + return framer_.SerializeFetch(data); + } + case moqt::MoqtMessageType::kFetchCancel: { + auto data = std::get<MoqtFetchCancel>(structured_data); + return framer_.SerializeFetchCancel(data); + } + case moqt::MoqtMessageType::kFetchOk: { + auto data = std::get<MoqtFetchOk>(structured_data); + return framer_.SerializeFetchOk(data); + } + case moqt::MoqtMessageType::kFetchError: { + auto data = std::get<MoqtFetchError>(structured_data); + return framer_.SerializeFetchError(data); + } case moqt::MoqtMessageType::kObjectAck: { auto data = std::get<MoqtObjectAck>(structured_data); return framer_.SerializeObjectAck(data); @@ -447,6 +468,29 @@ EXPECT_EQ(buffer.size(), 0); } +TEST_F(MoqtFramerSimpleTest, FetchEndBeforeStart) { + MoqtFetch fetch = { + /*subscribe_id =*/1, + /*full_track_name=*/FullTrackName{"foo", "bar"}, + /*subscriber_priority=*/2, + /*group_order=*/MoqtDeliveryOrder::kAscending, + /*start_object=*/FullSequence{1, 2}, + /*end_group=*/1, + /*end_object=*/1, + /*parameters=*/ + MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + }; + quiche::QuicheBuffer buffer; + EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch), + "Invalid FETCH object range"); + EXPECT_EQ(buffer.size(), 0); + fetch.end_group = 0; + fetch.end_object = std::nullopt; + EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch), + "Invalid FETCH object range"); + EXPECT_EQ(buffer.size(), 0); +} + TEST_F(MoqtFramerSimpleTest, SubscribeLatestGroupNonzeroObject) { MoqtSubscribe subscribe = { /*subscribe_id=*/3,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index a406a70..f9600de 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -107,6 +107,14 @@ return "UNSUBSCRIBE_NAMESPACE"; case MoqtMessageType::kMaxSubscribeId: return "MAX_SUBSCRIBE_ID"; + case MoqtMessageType::kFetch: + return "FETCH"; + case MoqtMessageType::kFetchCancel: + return "FETCH_CANCEL"; + case MoqtMessageType::kFetchOk: + return "FETCH_OK"; + case MoqtMessageType::kFetchError: + return "FETCH_ERROR"; case MoqtMessageType::kObjectAck: return "OBJECT_ACK"; }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 83502bc..eaca3d6 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -93,6 +93,10 @@ kSubscribeAnnouncesError = 0x13, kUnsubscribeAnnounces = 0x14, kMaxSubscribeId = 0x15, + kFetch = 0x16, + kFetchCancel = 0x17, + kFetchOk = 0x18, + kFetchError = 0x19, kClientSetup = 0x40, kServerSetup = 0x41, @@ -528,6 +532,34 @@ uint64_t max_subscribe_id; }; +struct QUICHE_EXPORT MoqtFetch { + uint64_t subscribe_id; + FullTrackName full_track_name; + MoqtPriority subscriber_priority; + std::optional<MoqtDeliveryOrder> group_order; + FullSequence start_object; // subgroup is ignored + uint64_t end_group; + std::optional<uint64_t> end_object; + MoqtSubscribeParameters parameters; +}; + +struct QUICHE_EXPORT MoqtFetchCancel { + uint64_t subscribe_id; +}; + +struct QUICHE_EXPORT MoqtFetchOk { + uint64_t subscribe_id; + MoqtDeliveryOrder group_order; + FullSequence largest_id; // subgroup is ignored + MoqtSubscribeParameters parameters; +}; + +struct QUICHE_EXPORT MoqtFetchError { + uint64_t subscribe_id; + SubscribeErrorCode error_code; + std::string reason_phrase; +}; + // All of the four values in this message are encoded as varints. // `delta_from_deadline` is encoded as an absolute value, with the lowest bit // indicating the sign (0 if positive).
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index aa50535..1af9f74 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -264,6 +264,18 @@ case MoqtMessageType::kMaxSubscribeId: bytes_read = ProcessMaxSubscribeId(reader); break; + case MoqtMessageType::kFetch: + bytes_read = ProcessFetch(reader); + break; + case MoqtMessageType::kFetchCancel: + bytes_read = ProcessFetchCancel(reader); + break; + case MoqtMessageType::kFetchOk: + bytes_read = ProcessFetchOk(reader); + break; + case MoqtMessageType::kFetchError: + bytes_read = ProcessFetchError(reader); + break; case moqt::MoqtMessageType::kObjectAck: bytes_read = ProcessObjectAck(reader); break; @@ -803,6 +815,83 @@ return reader.PreviouslyReadPayload().length(); } +size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) { + MoqtFetch fetch; + absl::string_view track_name; + uint8_t group_order; + uint64_t end_object; + if (!reader.ReadVarInt62(&fetch.subscribe_id) || + !ReadTrackNamespace(reader, fetch.full_track_name) || + !reader.ReadStringPieceVarInt62(&track_name) || + !reader.ReadUInt8(&fetch.subscriber_priority) || + !reader.ReadUInt8(&group_order) || + !reader.ReadVarInt62(&fetch.start_object.group) || + !reader.ReadVarInt62(&fetch.start_object.object) || + !reader.ReadVarInt62(&fetch.end_group) || + !reader.ReadVarInt62(&end_object) || + !ReadSubscribeParameters(reader, fetch.parameters)) { + return 0; + } + // Elements that have to be translated from the literal value. + fetch.full_track_name.AddElement(track_name); + if (!ParseDeliveryOrder(group_order, fetch.group_order)) { + ParseError("Invalid group order value in FETCH message"); + return 0; + } + fetch.end_object = + end_object == 0 ? std::optional<uint64_t>() : (end_object - 1); + if (fetch.end_group < fetch.start_object.group || + (fetch.end_group == fetch.start_object.group && + fetch.end_object.has_value() && + *fetch.end_object < fetch.start_object.object)) { + ParseError("End object comes before start object in FETCH"); + return 0; + } + visitor_.OnFetchMessage(fetch); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) { + MoqtFetchCancel fetch_cancel; + if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) { + return 0; + } + visitor_.OnFetchCancelMessage(fetch_cancel); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) { + MoqtFetchOk fetch_ok; + uint8_t group_order; + if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) || + !reader.ReadUInt8(&group_order) || + !reader.ReadVarInt62(&fetch_ok.largest_id.group) || + !reader.ReadVarInt62(&fetch_ok.largest_id.object) || + !ReadSubscribeParameters(reader, fetch_ok.parameters)) { + return 0; + } + if (group_order != 0x01 && group_order != 0x02) { + ParseError("Invalid group order value in FETCH_OK"); + return 0; + } + fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); + visitor_.OnFetchOkMessage(fetch_ok); + return reader.PreviouslyReadPayload().length(); +} + +size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) { + MoqtFetchError fetch_error; + uint64_t error_code; + if (!reader.ReadVarInt62(&fetch_error.subscribe_id) || + !reader.ReadVarInt62(&error_code) || + !reader.ReadStringVarInt62(fetch_error.reason_phrase)) { + return 0; + } + fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code); + visitor_.OnFetchErrorMessage(fetch_error); + return reader.PreviouslyReadPayload().length(); +} + size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) { MoqtObjectAck object_ack; uint64_t raw_delta;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index 4ebaf85..58795c4 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -52,6 +52,10 @@ virtual void OnUnsubscribeAnnouncesMessage( const MoqtUnsubscribeAnnounces& message) = 0; virtual void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) = 0; + virtual void OnFetchMessage(const MoqtFetch& message) = 0; + virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0; + virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0; + virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0; virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0; virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0; @@ -121,6 +125,10 @@ size_t ProcessSubscribeAnnouncesError(quic::QuicDataReader& reader); size_t ProcessUnsubscribeAnnounces(quic::QuicDataReader& reader); size_t ProcessMaxSubscribeId(quic::QuicDataReader& reader); + size_t ProcessFetch(quic::QuicDataReader& reader); + size_t ProcessFetchCancel(quic::QuicDataReader& reader); + size_t ProcessFetchOk(quic::QuicDataReader& reader); + size_t ProcessFetchError(quic::QuicDataReader& reader); size_t ProcessObjectAck(quic::QuicDataReader& reader); // If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 84644bc..fb9ed3c 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -52,6 +52,10 @@ MoqtMessageType::kSubscribeAnnouncesError, MoqtMessageType::kUnsubscribeAnnounces, MoqtMessageType::kMaxSubscribeId, + MoqtMessageType::kFetch, + MoqtMessageType::kFetchCancel, + MoqtMessageType::kFetchOk, + MoqtMessageType::kFetchError, MoqtMessageType::kObjectAck, }; constexpr std::array kDataStreamTypes{ @@ -194,6 +198,18 @@ void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override { OnControlMessage(message); } + void OnFetchMessage(const MoqtFetch& message) override { + OnControlMessage(message); + } + void OnFetchCancelMessage(const MoqtFetchCancel& message) override { + OnControlMessage(message); + } + void OnFetchOkMessage(const MoqtFetchOk& message) override { + OnControlMessage(message); + } + void OnFetchErrorMessage(const MoqtFetchError& message) override { + OnControlMessage(message); + } void OnObjectAckMessage(const MoqtObjectAck& message) override { OnControlMessage(message); } @@ -1310,6 +1326,39 @@ "SUBSCRIBE_DONE ContentExists has invalid value"); } +TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) { + MoqtControlParser parser(kRawQuic, visitor_); + FetchMessage fetch; + fetch.SetEndObject(1, 1); + parser.ProcessData(fetch.PacketSample(), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, + "End object comes before start object in FETCH"); +} + +TEST_F(MoqtMessageSpecificTest, FetchInvalidRange2) { + MoqtControlParser parser(kRawQuic, visitor_); + FetchMessage fetch; + fetch.SetEndObject(0, std::nullopt); + parser.ProcessData(fetch.PacketSample(), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, + "End object comes before start object in FETCH"); +} + +TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) { + MoqtControlParser parser(kRawQuic, visitor_); + FetchMessage fetch; + fetch.SetGroupOrder(3); + parser.ProcessData(fetch.PacketSample(), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, + "Invalid group order value in FETCH message"); +} + TEST_F(MoqtMessageSpecificTest, PaddingStream) { MoqtDataParser parser(&visitor_); std::string buffer(32, '\0');
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index eeb9324..34e0aaf 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -213,6 +213,10 @@ void OnUnsubscribeAnnouncesMessage( const MoqtUnsubscribeAnnounces& message) override {} void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override; + void OnFetchMessage(const MoqtFetch& message) override {} + void OnFetchCancelMessage(const MoqtFetchCancel& message) override {} + void OnFetchOkMessage(const MoqtFetchOk& message) override {} + void OnFetchErrorMessage(const MoqtFetchError& message) override {} void OnObjectAckMessage(const MoqtObjectAck& message) override { auto subscription_it = session_->published_subscriptions_.find(message.subscribe_id);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 23d34a9..d0318bd 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -39,7 +39,8 @@ MoqtAnnounceCancel, MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus, MoqtGoAway, MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError, - MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtObjectAck>; + MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel, + MoqtFetchOk, MoqtFetchError, MoqtObjectAck>; // The total actual size of the message. size_t total_message_size() const { return wire_image_size_; } @@ -1293,6 +1294,222 @@ }; }; +class QUICHE_NO_EXPORT FetchMessage : public TestMessageBase { + public: + FetchMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetch>(values); + if (cast.subscribe_id != fetch_.subscribe_id) { + QUIC_LOG(INFO) << "FETCH subscribe_id mismatch"; + return false; + } + if (cast.full_track_name != fetch_.full_track_name) { + QUIC_LOG(INFO) << "FETCH full_track_name mismatch"; + return false; + } + if (cast.subscriber_priority != fetch_.subscriber_priority) { + QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch"; + return false; + } + if (cast.group_order != fetch_.group_order) { + QUIC_LOG(INFO) << "FETCH group_order mismatch"; + return false; + } + if (cast.start_object != fetch_.start_object) { + QUIC_LOG(INFO) << "FETCH start_object mismatch"; + return false; + } + if (cast.end_group != fetch_.end_group) { + QUIC_LOG(INFO) << "FETCH end_group mismatch"; + return false; + } + if (cast.end_object != fetch_.end_object) { + QUIC_LOG(INFO) << "FETCH end_object mismatch"; + return false; + } + if (cast.parameters != fetch_.parameters) { + QUIC_LOG(INFO) << "FETCH parameters mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { + ExpandVarintsImpl("vvvv---v-----vvvvvvv---"); + } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_); + } + + void SetEndObject(uint64_t group, std::optional<uint64_t> object) { + // Avoid varint nonsense. + QUICHE_CHECK(group < 64); + QUICHE_CHECK(!object.has_value() || *object < 64); + fetch_.end_group = group; + fetch_.end_object = object; + raw_packet_[16] = group; + raw_packet_[17] = object.has_value() ? (*object + 1) : 0; + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + void SetGroupOrder(uint8_t group_order) { + raw_packet_[13] = static_cast<uint8_t>(group_order); + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + private: + uint8_t raw_packet_[24] = { + 0x16, 0x16, + 0x01, // subscribe_id = 1 + 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo" + 0x03, 0x62, 0x61, 0x72, // track_name = "bar" + 0x02, // priority = kHigh + 0x01, // group_order = kAscending + 0x01, 0x02, // start_object = 1, 2 + 0x05, 0x07, // end_object = 5, 6 + 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" + }; + + MoqtFetch fetch_ = { + /*subscribe_id =*/1, + /*full_track_name=*/FullTrackName{"foo", "bar"}, + /*subscriber_priority=*/2, + /*group_order=*/MoqtDeliveryOrder::kAscending, + /*start_object=*/FullSequence{1, 2}, + /*end_group=*/5, + /*end_object=*/6, + /*parameters=*/ + MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + }; +}; + +class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase { + public: + FetchCancelMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetchCancel>(values); + if (cast.subscribe_id != fetch_cancel_.subscribe_id) { + QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_cancel_); + } + + private: + uint8_t raw_packet_[3] = { + 0x17, 0x01, + 0x01, // subscribe_id = 1 + }; + + MoqtFetchCancel fetch_cancel_ = { + /*subscribe_id =*/1, + }; +}; + +class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase { + public: + FetchOkMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetchOk>(values); + if (cast.subscribe_id != fetch_ok_.subscribe_id) { + QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch"; + return false; + } + if (cast.group_order != fetch_ok_.group_order) { + QUIC_LOG(INFO) << "FETCH_OK group_order mismatch"; + return false; + } + if (cast.largest_id != fetch_ok_.largest_id) { + QUIC_LOG(INFO) << "FETCH_OK start_object mismatch"; + return false; + } + if (cast.parameters != fetch_ok_.parameters) { + QUIC_LOG(INFO) << "FETCH_OK parameters mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv-vvvvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_ok_); + } + + private: + uint8_t raw_packet_[12] = { + 0x18, 0x0a, + 0x01, // subscribe_id = 1 + 0x01, // group_order = kAscending + 0x05, 0x04, // largest_object = 5, 4 + 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz" + }; + + MoqtFetchOk fetch_ok_ = { + /*subscribe_id =*/1, + /*group_order=*/MoqtDeliveryOrder::kAscending, + /*start_object=*/FullSequence{5, 4}, + /*parameters=*/ + MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt}, + }; +}; + +class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase { + public: + FetchErrorMessage() : TestMessageBase() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + bool EqualFieldValues(MessageStructuredData& values) const override { + auto cast = std::get<MoqtFetchError>(values); + if (cast.subscribe_id != fetch_error_.subscribe_id) { + QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch"; + return false; + } + if (cast.error_code != fetch_error_.error_code) { + QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch"; + return false; + } + if (cast.reason_phrase != fetch_error_.reason_phrase) { + QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch"; + return false; + } + return true; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvvvv---"); } + + MessageStructuredData structured_data() const override { + return TestMessageBase::MessageStructuredData(fetch_error_); + } + + private: + uint8_t raw_packet_[8] = { + 0x19, 0x06, + 0x01, // subscribe_id = 1 + 0x04, // error_code = kUnauthorized + 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar" + }; + + MoqtFetchError fetch_error_ = { + /*subscribe_id =*/1, + /*error_code=*/SubscribeErrorCode::kUnauthorized, + /*reason_phrase=*/"bar", + }; +}; + class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase { public: ObjectAckMessage() : TestMessageBase() { @@ -1383,6 +1600,14 @@ return std::make_unique<UnsubscribeAnnouncesMessage>(); case MoqtMessageType::kMaxSubscribeId: return std::make_unique<MaxSubscribeIdMessage>(); + case MoqtMessageType::kFetch: + return std::make_unique<FetchMessage>(); + case MoqtMessageType::kFetchCancel: + return std::make_unique<FetchCancelMessage>(); + case MoqtMessageType::kFetchOk: + return std::make_unique<FetchOkMessage>(); + case MoqtMessageType::kFetchError: + return std::make_unique<FetchErrorMessage>(); case MoqtMessageType::kObjectAck: return std::make_unique<ObjectAckMessage>(); case MoqtMessageType::kClientSetup: