Parser/Framer for MoQT FETCH Stream messages. New to draft-07. PiperOrigin-RevId: 690630869
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 3e0834d..fe8c47f 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -263,20 +263,21 @@ } // namespace quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( - const MoqtObject& message, bool is_first_in_stream) { - if (!ValidateObjectMetadata(message)) { + const MoqtObject& message, MoqtDataStreamType message_type, + bool is_first_in_stream) { + if (!ValidateObjectMetadata(message, message_type)) { QUIC_BUG(quic_bug_serialize_object_header_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); } - if (message.forwarding_preference == MoqtForwardingPreference::kDatagram) { + if (message_type == MoqtDataStreamType::kObjectDatagram) { QUIC_BUG(quic_bug_serialize_object_header_02) << "Datagrams use SerializeObjectDatagram()"; return quiche::QuicheBuffer(); } if (!is_first_in_stream) { - switch (message.forwarding_preference) { - case MoqtForwardingPreference::kTrack: + switch (message_type) { + case MoqtDataStreamType::kStreamHeaderTrack: return (message.payload_length == 0) ? Serialize(WireVarInt62(message.group_id), WireVarInt62(message.object_id), @@ -285,7 +286,7 @@ : Serialize(WireVarInt62(message.group_id), WireVarInt62(message.object_id), WireVarInt62(message.payload_length)); - case MoqtForwardingPreference::kSubgroup: + case MoqtDataStreamType::kStreamHeaderSubgroup: return (message.payload_length == 0) ? Serialize(WireVarInt62(message.object_id), WireVarInt62(message.payload_length), @@ -293,15 +294,27 @@ message.object_status))) : Serialize(WireVarInt62(message.object_id), WireVarInt62(message.payload_length)); + case MoqtDataStreamType::kStreamHeaderFetch: + return (message.payload_length == 0) + ? Serialize(WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), + WireVarInt62(message.object_id), + WireUint8(message.publisher_priority), + WireVarInt62(message.payload_length), + WireVarInt62(static_cast<uint64_t>( + message.object_status))) + : Serialize(WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), + WireVarInt62(message.object_id), + WireUint8(message.publisher_priority), + WireVarInt62(message.payload_length)); default: QUICHE_NOTREACHED(); return quiche::QuicheBuffer(); } } - MoqtDataStreamType message_type = - GetMessageTypeForForwardingPreference(message.forwarding_preference); - switch (message.forwarding_preference) { - case MoqtForwardingPreference::kTrack: + switch (message_type) { + case MoqtDataStreamType::kStreamHeaderTrack: return (message.payload_length == 0) ? Serialize(WireVarInt62(message_type), WireVarInt62(message.track_alias), @@ -316,7 +329,7 @@ WireVarInt62(message.group_id), WireVarInt62(message.object_id), WireVarInt62(message.payload_length)); - case MoqtForwardingPreference::kSubgroup: + case MoqtDataStreamType::kStreamHeaderSubgroup: return (message.payload_length == 0) ? Serialize(WireVarInt62(message_type), WireVarInt62(message.track_alias), @@ -333,7 +346,24 @@ WireUint8(message.publisher_priority), WireVarInt62(message.object_id), WireVarInt62(message.payload_length)); - case MoqtForwardingPreference::kDatagram: + case MoqtDataStreamType::kStreamHeaderFetch: + return (message.payload_length == 0) + ? Serialize(WireVarInt62(message_type), + WireVarInt62(message.track_alias), + WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), + WireVarInt62(message.object_id), + WireUint8(message.publisher_priority), + WireVarInt62(message.payload_length), + WireVarInt62(message.object_status)) + : Serialize(WireVarInt62(message_type), + WireVarInt62(message.track_alias), + WireVarInt62(message.group_id), + WireVarInt62(*message.subgroup_id), + WireVarInt62(message.object_id), + WireUint8(message.publisher_priority), + WireVarInt62(message.payload_length)); + default: QUICHE_NOTREACHED(); return quiche::QuicheBuffer(); } @@ -341,7 +371,7 @@ quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram( const MoqtObject& message, absl::string_view payload) { - if (!ValidateObjectMetadata(message)) { + if (!ValidateObjectMetadata(message, MoqtDataStreamType::kObjectDatagram)) { QUIC_BUG(quic_bug_serialize_object_datagram_01) << "Object metadata is invalid"; return quiche::QuicheBuffer(); @@ -697,12 +727,14 @@ } // static -bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) { +bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object, + MoqtDataStreamType message_type) { if (object.object_status != MoqtObjectStatus::kNormal && object.payload_length > 0) { return false; } - if ((object.forwarding_preference == MoqtForwardingPreference::kSubgroup) != + if ((message_type == MoqtDataStreamType::kStreamHeaderSubgroup || + message_type == MoqtDataStreamType::kStreamHeaderFetch) != object.subgroup_id.has_value()) { return false; }
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index ae931a9..6969ab4 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -31,6 +31,7 @@ // Serializes the header for an object, including the appropriate stream // header if `is_first_in_stream` is set to true. quiche::QuicheBuffer SerializeObjectHeader(const MoqtObject& message, + MoqtDataStreamType message_type, bool is_first_in_stream); quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message, absl::string_view payload); @@ -73,7 +74,8 @@ private: // Returns true if the metadata is internally consistent. - static bool ValidateObjectMetadata(const MoqtObject& object); + static bool ValidateObjectMetadata(const MoqtObject& object, + MoqtDataStreamType message_type); quiche::QuicheBufferAllocator* allocator_; bool using_webtrans_;
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index b740d64..0a2387b 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -84,13 +84,15 @@ quiche::QuicheBuffer SerializeObject(MoqtFramer& framer, const MoqtObject& message, absl::string_view payload, + MoqtDataStreamType stream_type, bool is_first_in_stream) { MoqtObject adjusted_message = message; adjusted_message.payload_length = payload.size(); quiche::QuicheBuffer header = (message.forwarding_preference == MoqtForwardingPreference::kDatagram) ? framer.SerializeObjectDatagram(adjusted_message, payload) - : framer.SerializeObjectHeader(adjusted_message, is_first_in_stream); + : framer.SerializeObjectHeader(adjusted_message, stream_type, + is_first_in_stream); if (header.empty()) { return quiche::QuicheBuffer(); } @@ -259,28 +261,48 @@ TEST_F(MoqtFramerSimpleTest, GroupMiddler) { auto header = std::make_unique<StreamHeaderSubgroupMessage>(); - auto buffer1 = SerializeObject( - framer_, std::get<MoqtObject>(header->structured_data()), "foo", true); + auto buffer1 = + SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()), + "foo", MoqtDataStreamType::kStreamHeaderSubgroup, true); EXPECT_EQ(buffer1.size(), header->total_message_size()); EXPECT_EQ(buffer1.AsStringView(), header->PacketSample()); auto middler = std::make_unique<StreamMiddlerSubgroupMessage>(); - auto buffer2 = SerializeObject( - framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false); + auto buffer2 = + SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()), + "bar", MoqtDataStreamType::kStreamHeaderSubgroup, false); EXPECT_EQ(buffer2.size(), middler->total_message_size()); EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample()); } TEST_F(MoqtFramerSimpleTest, TrackMiddler) { auto header = std::make_unique<StreamHeaderTrackMessage>(); - auto buffer1 = SerializeObject( - framer_, std::get<MoqtObject>(header->structured_data()), "foo", true); + auto buffer1 = + SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()), + "foo", MoqtDataStreamType::kStreamHeaderTrack, true); EXPECT_EQ(buffer1.size(), header->total_message_size()); EXPECT_EQ(buffer1.AsStringView(), header->PacketSample()); auto middler = std::make_unique<StreamMiddlerTrackMessage>(); - auto buffer2 = SerializeObject( - framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false); + auto buffer2 = + SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()), + "bar", MoqtDataStreamType::kStreamHeaderTrack, false); + EXPECT_EQ(buffer2.size(), middler->total_message_size()); + EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample()); +} + +TEST_F(MoqtFramerSimpleTest, FetchMiddler) { + auto header = std::make_unique<StreamHeaderFetchMessage>(); + auto buffer1 = + SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()), + "foo", MoqtDataStreamType::kStreamHeaderFetch, true); + EXPECT_EQ(buffer1.size(), header->total_message_size()); + EXPECT_EQ(buffer1.AsStringView(), header->PacketSample()); + + auto middler = std::make_unique<StreamMiddlerFetchMessage>(); + auto buffer2 = + SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()), + "bar", MoqtDataStreamType::kStreamHeaderFetch, false); EXPECT_EQ(buffer2.size(), middler->total_message_size()); EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample()); } @@ -299,28 +321,34 @@ }; quiche::QuicheBuffer buffer; - // SerializeObjectDatagram() only accepts kDatagram. - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"), - "Only datagrams use SerializeObjectDatagram()"); - EXPECT_TRUE(buffer.empty()); - // kSubgroup must have a subgroup_id. object.subgroup_id = std::nullopt; - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::kStreamHeaderSubgroup, false), + "Object metadata is invalid"); + EXPECT_TRUE(buffer.empty()); + object.subgroup_id = 8; + + // kFetch must have a subgroup_id. + object.subgroup_id = std::nullopt; + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::kStreamHeaderFetch, false), "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); object.subgroup_id = 8; // kTrack must not have a subgroup_id. object.forwarding_preference = MoqtForwardingPreference::kTrack; - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::kStreamHeaderTrack, false), "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); object.forwarding_preference = MoqtForwardingPreference::kSubgroup; // Non-normal status must have no payload. object.object_status = MoqtObjectStatus::kEndOfGroup; - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::kStreamHeaderSubgroup, false), "Object metadata is invalid"); EXPECT_TRUE(buffer.empty()); // object.object_status = MoqtObjectStatus::kNormal; @@ -341,7 +369,8 @@ quiche::QuicheBuffer buffer; // No datagrams to SerializeObjectHeader(). - EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false), + EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader( + object, MoqtDataStreamType::kObjectDatagram, false), "Datagrams use SerializeObjectDatagram()") EXPECT_TRUE(buffer.empty());
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index f9600de..ce6680c 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -129,6 +129,8 @@ return "STREAM_HEADER_TRACK"; case MoqtDataStreamType::kStreamHeaderSubgroup: return "STREAM_HEADER_SUBGROUP"; + case MoqtDataStreamType::kStreamHeaderFetch: + return "STREAM_HEADER_FETCH"; case MoqtDataStreamType::kPadding: return "PADDING"; } @@ -158,6 +160,8 @@ return MoqtForwardingPreference::kTrack; case MoqtDataStreamType::kStreamHeaderSubgroup: return MoqtForwardingPreference::kSubgroup; + case MoqtDataStreamType::kStreamHeaderFetch: + return MoqtForwardingPreference::kTrack; // This is a placeholder. default: break; }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 5a4064a..0b09438 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -68,6 +68,7 @@ kObjectDatagram = 0x01, kStreamHeaderTrack = 0x02, kStreamHeaderSubgroup = 0x04, + kStreamHeaderFetch = 0x05, // Currently QUICHE-specific. All data on a kPadding stream is ignored. kPadding = 0x26d3, @@ -324,7 +325,7 @@ // The data contained in every Object message, although the message type // implies some of the values. struct QUICHE_EXPORT MoqtObject { - uint64_t track_alias; + uint64_t track_alias; // For FETCH, this is the subscribe ID. uint64_t group_id; uint64_t object_id; MoqtPriority publisher_priority;
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 2c63356..eae5b48 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -52,7 +52,8 @@ bool IsAllowedStreamType(uint64_t value) { constexpr std::array kAllowedStreamTypes = { MoqtDataStreamType::kStreamHeaderSubgroup, - MoqtDataStreamType::kStreamHeaderTrack, MoqtDataStreamType::kPadding}; + MoqtDataStreamType::kStreamHeaderTrack, + MoqtDataStreamType::kStreamHeaderFetch, MoqtDataStreamType::kPadding}; for (MoqtDataStreamType type : kAllowedStreamTypes) { if (static_cast<uint64_t>(type) == value) { return true; @@ -67,6 +68,7 @@ return 0; } if (type != MoqtDataStreamType::kStreamHeaderTrack && + type != MoqtDataStreamType::kStreamHeaderFetch && !reader.ReadVarInt62(&object.group_id)) { return 0; } @@ -81,7 +83,8 @@ !reader.ReadVarInt62(&object.object_id)) { return 0; } - if (!reader.ReadUInt8(&object.publisher_priority)) { + if (type != MoqtDataStreamType::kStreamHeaderFetch && + !reader.ReadUInt8(&object.publisher_priority)) { return 0; } uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal); @@ -99,14 +102,28 @@ MoqtDataStreamType type) { switch (type) { case MoqtDataStreamType::kStreamHeaderTrack: + case MoqtDataStreamType::kStreamHeaderFetch: if (!reader.ReadVarInt62(&object.group_id)) { return 0; } + if (type == MoqtDataStreamType::kStreamHeaderFetch) { + uint64_t value; + if (!reader.ReadVarInt62(&value)) { + return 0; + } + object.subgroup_id = value; + } [[fallthrough]]; case MoqtDataStreamType::kStreamHeaderSubgroup: { - if (!reader.ReadVarInt62(&object.object_id) || - !reader.ReadVarInt62(&object.payload_length)) { + if (!reader.ReadVarInt62(&object.object_id)) { + return 0; + } + if (type == MoqtDataStreamType::kStreamHeaderFetch && + !reader.ReadUInt8(&object.publisher_priority)) { + return 0; + } + if (!reader.ReadVarInt62(&object.payload_length)) { return 0; } uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 3e6a0d9..a7c7c74 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -60,7 +60,9 @@ }; constexpr std::array kDataStreamTypes{ MoqtDataStreamType::kStreamHeaderTrack, - MoqtDataStreamType::kStreamHeaderSubgroup}; + MoqtDataStreamType::kStreamHeaderSubgroup, + MoqtDataStreamType::kStreamHeaderFetch, +}; using GeneralizedMessageType = absl::variant<MoqtMessageType, MoqtDataStreamType>;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 3236bae..7c52c68 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -1288,7 +1288,9 @@ header.payload_length = object.payload.length(); quiche::QuicheBuffer serialized_header = - session_->framer_.SerializeObjectHeader(header, !stream_header_written_); + session_->framer_.SerializeObjectHeader( + header, GetMessageTypeForForwardingPreference(forwarding_preference), + !stream_header_written_); bool fin = false; switch (forwarding_preference) { case MoqtForwardingPreference::kTrack:
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 1138024..c26261f 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -326,6 +326,56 @@ }; }; +class QUICHE_NO_EXPORT StreamHeaderFetchMessage : public ObjectMessage { + public: + StreamHeaderFetchMessage() : ObjectMessage() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + object_.subgroup_id = 8; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvvvv-v---", false); } + + bool SetPayloadLength(uint8_t payload_length) { + if (payload_length > 63) { + // This only supports one-byte varints. + return false; + } + object_.payload_length = payload_length; + raw_packet_[6] = payload_length; + SetWireImage(raw_packet_, sizeof(raw_packet_)); + return true; + } + + private: + uint8_t raw_packet_[10] = { + 0x05, // type field + 0x04, // subscribe ID + // object middler: + 0x05, 0x08, 0x06, // sequence + 0x07, // publisher priority + 0x03, 0x66, 0x6f, 0x6f, // payload = "foo" + }; +}; + +// Used only for tests that process multiple objects on one stream. +class QUICHE_NO_EXPORT StreamMiddlerFetchMessage : public ObjectMessage { + public: + StreamMiddlerFetchMessage() : ObjectMessage() { + SetWireImage(raw_packet_, sizeof(raw_packet_)); + object_.forwarding_preference = MoqtForwardingPreference::kTrack; + object_.subgroup_id = 8; + object_.object_id = 9; + } + + void ExpandVarints() override { ExpandVarintsImpl("vvv-v---", false); } + + private: + uint8_t raw_packet_[8] = { + 0x05, 0x08, 0x09, 0x07, // Object metadata + 0x03, 0x62, 0x61, 0x72, // Payload = "bar" + }; +}; + class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase { public: explicit ClientSetupMessage(bool webtrans) : TestMessageBase() { @@ -1623,6 +1673,8 @@ return std::make_unique<StreamHeaderTrackMessage>(); case MoqtDataStreamType::kStreamHeaderSubgroup: return std::make_unique<StreamHeaderSubgroupMessage>(); + case MoqtDataStreamType::kStreamHeaderFetch: + return std::make_unique<StreamHeaderFetchMessage>(); case MoqtDataStreamType::kPadding: return nullptr; }