Parser/Framer for MoQT FETCH Stream messages.

New to draft-07.

PiperOrigin-RevId: 690630869
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 3e0834d..fe8c47f 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -263,20 +263,21 @@
 }  // namespace
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
-    const MoqtObject& message, bool is_first_in_stream) {
-  if (!ValidateObjectMetadata(message)) {
+    const MoqtObject& message, MoqtDataStreamType message_type,
+    bool is_first_in_stream) {
+  if (!ValidateObjectMetadata(message, message_type)) {
     QUIC_BUG(quic_bug_serialize_object_header_01)
         << "Object metadata is invalid";
     return quiche::QuicheBuffer();
   }
-  if (message.forwarding_preference == MoqtForwardingPreference::kDatagram) {
+  if (message_type == MoqtDataStreamType::kObjectDatagram) {
     QUIC_BUG(quic_bug_serialize_object_header_02)
         << "Datagrams use SerializeObjectDatagram()";
     return quiche::QuicheBuffer();
   }
   if (!is_first_in_stream) {
-    switch (message.forwarding_preference) {
-      case MoqtForwardingPreference::kTrack:
+    switch (message_type) {
+      case MoqtDataStreamType::kStreamHeaderTrack:
         return (message.payload_length == 0)
                    ? Serialize(WireVarInt62(message.group_id),
                                WireVarInt62(message.object_id),
@@ -285,7 +286,7 @@
                    : Serialize(WireVarInt62(message.group_id),
                                WireVarInt62(message.object_id),
                                WireVarInt62(message.payload_length));
-      case MoqtForwardingPreference::kSubgroup:
+      case MoqtDataStreamType::kStreamHeaderSubgroup:
         return (message.payload_length == 0)
                    ? Serialize(WireVarInt62(message.object_id),
                                WireVarInt62(message.payload_length),
@@ -293,15 +294,27 @@
                                    message.object_status)))
                    : Serialize(WireVarInt62(message.object_id),
                                WireVarInt62(message.payload_length));
+      case MoqtDataStreamType::kStreamHeaderFetch:
+        return (message.payload_length == 0)
+                   ? Serialize(WireVarInt62(message.group_id),
+                               WireVarInt62(*message.subgroup_id),
+                               WireVarInt62(message.object_id),
+                               WireUint8(message.publisher_priority),
+                               WireVarInt62(message.payload_length),
+                               WireVarInt62(static_cast<uint64_t>(
+                                   message.object_status)))
+                   : Serialize(WireVarInt62(message.group_id),
+                               WireVarInt62(*message.subgroup_id),
+                               WireVarInt62(message.object_id),
+                               WireUint8(message.publisher_priority),
+                               WireVarInt62(message.payload_length));
       default:
         QUICHE_NOTREACHED();
         return quiche::QuicheBuffer();
     }
   }
-  MoqtDataStreamType message_type =
-      GetMessageTypeForForwardingPreference(message.forwarding_preference);
-  switch (message.forwarding_preference) {
-    case MoqtForwardingPreference::kTrack:
+  switch (message_type) {
+    case MoqtDataStreamType::kStreamHeaderTrack:
       return (message.payload_length == 0)
                  ? Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.track_alias),
@@ -316,7 +329,7 @@
                              WireVarInt62(message.group_id),
                              WireVarInt62(message.object_id),
                              WireVarInt62(message.payload_length));
-    case MoqtForwardingPreference::kSubgroup:
+    case MoqtDataStreamType::kStreamHeaderSubgroup:
       return (message.payload_length == 0)
                  ? Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.track_alias),
@@ -333,7 +346,24 @@
                              WireUint8(message.publisher_priority),
                              WireVarInt62(message.object_id),
                              WireVarInt62(message.payload_length));
-    case MoqtForwardingPreference::kDatagram:
+    case MoqtDataStreamType::kStreamHeaderFetch:
+      return (message.payload_length == 0)
+                 ? Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(*message.subgroup_id),
+                             WireVarInt62(message.object_id),
+                             WireUint8(message.publisher_priority),
+                             WireVarInt62(message.payload_length),
+                             WireVarInt62(message.object_status))
+                 : Serialize(WireVarInt62(message_type),
+                             WireVarInt62(message.track_alias),
+                             WireVarInt62(message.group_id),
+                             WireVarInt62(*message.subgroup_id),
+                             WireVarInt62(message.object_id),
+                             WireUint8(message.publisher_priority),
+                             WireVarInt62(message.payload_length));
+    default:
       QUICHE_NOTREACHED();
       return quiche::QuicheBuffer();
   }
@@ -341,7 +371,7 @@
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
     const MoqtObject& message, absl::string_view payload) {
-  if (!ValidateObjectMetadata(message)) {
+  if (!ValidateObjectMetadata(message, MoqtDataStreamType::kObjectDatagram)) {
     QUIC_BUG(quic_bug_serialize_object_datagram_01)
         << "Object metadata is invalid";
     return quiche::QuicheBuffer();
@@ -697,12 +727,14 @@
 }
 
 // static
-bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) {
+bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object,
+                                        MoqtDataStreamType message_type) {
   if (object.object_status != MoqtObjectStatus::kNormal &&
       object.payload_length > 0) {
     return false;
   }
-  if ((object.forwarding_preference == MoqtForwardingPreference::kSubgroup) !=
+  if ((message_type == MoqtDataStreamType::kStreamHeaderSubgroup ||
+       message_type == MoqtDataStreamType::kStreamHeaderFetch) !=
       object.subgroup_id.has_value()) {
     return false;
   }
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index ae931a9..6969ab4 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -31,6 +31,7 @@
   // Serializes the header for an object, including the appropriate stream
   // header if `is_first_in_stream` is set to true.
   quiche::QuicheBuffer SerializeObjectHeader(const MoqtObject& message,
+                                             MoqtDataStreamType message_type,
                                              bool is_first_in_stream);
   quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message,
                                                absl::string_view payload);
@@ -73,7 +74,8 @@
 
  private:
   // Returns true if the metadata is internally consistent.
-  static bool ValidateObjectMetadata(const MoqtObject& object);
+  static bool ValidateObjectMetadata(const MoqtObject& object,
+                                     MoqtDataStreamType message_type);
 
   quiche::QuicheBufferAllocator* allocator_;
   bool using_webtrans_;
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index b740d64..0a2387b 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -84,13 +84,15 @@
 quiche::QuicheBuffer SerializeObject(MoqtFramer& framer,
                                      const MoqtObject& message,
                                      absl::string_view payload,
+                                     MoqtDataStreamType stream_type,
                                      bool is_first_in_stream) {
   MoqtObject adjusted_message = message;
   adjusted_message.payload_length = payload.size();
   quiche::QuicheBuffer header =
       (message.forwarding_preference == MoqtForwardingPreference::kDatagram)
           ? framer.SerializeObjectDatagram(adjusted_message, payload)
-          : framer.SerializeObjectHeader(adjusted_message, is_first_in_stream);
+          : framer.SerializeObjectHeader(adjusted_message, stream_type,
+                                         is_first_in_stream);
   if (header.empty()) {
     return quiche::QuicheBuffer();
   }
@@ -259,28 +261,48 @@
 
 TEST_F(MoqtFramerSimpleTest, GroupMiddler) {
   auto header = std::make_unique<StreamHeaderSubgroupMessage>();
-  auto buffer1 = SerializeObject(
-      framer_, std::get<MoqtObject>(header->structured_data()), "foo", true);
+  auto buffer1 =
+      SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()),
+                      "foo", MoqtDataStreamType::kStreamHeaderSubgroup, true);
   EXPECT_EQ(buffer1.size(), header->total_message_size());
   EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
 
   auto middler = std::make_unique<StreamMiddlerSubgroupMessage>();
-  auto buffer2 = SerializeObject(
-      framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  auto buffer2 =
+      SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()),
+                      "bar", MoqtDataStreamType::kStreamHeaderSubgroup, false);
   EXPECT_EQ(buffer2.size(), middler->total_message_size());
   EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
 }
 
 TEST_F(MoqtFramerSimpleTest, TrackMiddler) {
   auto header = std::make_unique<StreamHeaderTrackMessage>();
-  auto buffer1 = SerializeObject(
-      framer_, std::get<MoqtObject>(header->structured_data()), "foo", true);
+  auto buffer1 =
+      SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()),
+                      "foo", MoqtDataStreamType::kStreamHeaderTrack, true);
   EXPECT_EQ(buffer1.size(), header->total_message_size());
   EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
 
   auto middler = std::make_unique<StreamMiddlerTrackMessage>();
-  auto buffer2 = SerializeObject(
-      framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  auto buffer2 =
+      SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()),
+                      "bar", MoqtDataStreamType::kStreamHeaderTrack, false);
+  EXPECT_EQ(buffer2.size(), middler->total_message_size());
+  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+}
+
+TEST_F(MoqtFramerSimpleTest, FetchMiddler) {
+  auto header = std::make_unique<StreamHeaderFetchMessage>();
+  auto buffer1 =
+      SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()),
+                      "foo", MoqtDataStreamType::kStreamHeaderFetch, true);
+  EXPECT_EQ(buffer1.size(), header->total_message_size());
+  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+
+  auto middler = std::make_unique<StreamMiddlerFetchMessage>();
+  auto buffer2 =
+      SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()),
+                      "bar", MoqtDataStreamType::kStreamHeaderFetch, false);
   EXPECT_EQ(buffer2.size(), middler->total_message_size());
   EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
 }
@@ -299,28 +321,34 @@
   };
   quiche::QuicheBuffer buffer;
 
-  // SerializeObjectDatagram() only accepts kDatagram.
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
-                  "Only datagrams use SerializeObjectDatagram()");
-  EXPECT_TRUE(buffer.empty());
-
   // kSubgroup must have a subgroup_id.
   object.subgroup_id = std::nullopt;
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(
+                      object, MoqtDataStreamType::kStreamHeaderSubgroup, false),
+                  "Object metadata is invalid");
+  EXPECT_TRUE(buffer.empty());
+  object.subgroup_id = 8;
+
+  // kFetch must have a subgroup_id.
+  object.subgroup_id = std::nullopt;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(
+                      object, MoqtDataStreamType::kStreamHeaderFetch, false),
                   "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
   object.subgroup_id = 8;
 
   // kTrack must not have a subgroup_id.
   object.forwarding_preference = MoqtForwardingPreference::kTrack;
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(
+                      object, MoqtDataStreamType::kStreamHeaderTrack, false),
                   "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
   object.forwarding_preference = MoqtForwardingPreference::kSubgroup;
 
   // Non-normal status must have no payload.
   object.object_status = MoqtObjectStatus::kEndOfGroup;
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(
+                      object, MoqtDataStreamType::kStreamHeaderSubgroup, false),
                   "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
   // object.object_status = MoqtObjectStatus::kNormal;
@@ -341,7 +369,8 @@
   quiche::QuicheBuffer buffer;
 
   // No datagrams to SerializeObjectHeader().
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(
+                      object, MoqtDataStreamType::kObjectDatagram, false),
                   "Datagrams use SerializeObjectDatagram()")
   EXPECT_TRUE(buffer.empty());
 
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index f9600de..ce6680c 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -129,6 +129,8 @@
       return "STREAM_HEADER_TRACK";
     case MoqtDataStreamType::kStreamHeaderSubgroup:
       return "STREAM_HEADER_SUBGROUP";
+    case MoqtDataStreamType::kStreamHeaderFetch:
+      return "STREAM_HEADER_FETCH";
     case MoqtDataStreamType::kPadding:
       return "PADDING";
   }
@@ -158,6 +160,8 @@
       return MoqtForwardingPreference::kTrack;
     case MoqtDataStreamType::kStreamHeaderSubgroup:
       return MoqtForwardingPreference::kSubgroup;
+    case MoqtDataStreamType::kStreamHeaderFetch:
+      return MoqtForwardingPreference::kTrack;  // This is a placeholder.
     default:
       break;
   }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 5a4064a..0b09438 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -68,6 +68,7 @@
   kObjectDatagram = 0x01,
   kStreamHeaderTrack = 0x02,
   kStreamHeaderSubgroup = 0x04,
+  kStreamHeaderFetch = 0x05,
 
   // Currently QUICHE-specific.  All data on a kPadding stream is ignored.
   kPadding = 0x26d3,
@@ -324,7 +325,7 @@
 // The data contained in every Object message, although the message type
 // implies some of the values.
 struct QUICHE_EXPORT MoqtObject {
-  uint64_t track_alias;
+  uint64_t track_alias;  // For FETCH, this is the subscribe ID.
   uint64_t group_id;
   uint64_t object_id;
   MoqtPriority publisher_priority;
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 2c63356..eae5b48 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -52,7 +52,8 @@
 bool IsAllowedStreamType(uint64_t value) {
   constexpr std::array kAllowedStreamTypes = {
       MoqtDataStreamType::kStreamHeaderSubgroup,
-      MoqtDataStreamType::kStreamHeaderTrack, MoqtDataStreamType::kPadding};
+      MoqtDataStreamType::kStreamHeaderTrack,
+      MoqtDataStreamType::kStreamHeaderFetch, MoqtDataStreamType::kPadding};
   for (MoqtDataStreamType type : kAllowedStreamTypes) {
     if (static_cast<uint64_t>(type) == value) {
       return true;
@@ -67,6 +68,7 @@
     return 0;
   }
   if (type != MoqtDataStreamType::kStreamHeaderTrack &&
+      type != MoqtDataStreamType::kStreamHeaderFetch &&
       !reader.ReadVarInt62(&object.group_id)) {
     return 0;
   }
@@ -81,7 +83,8 @@
       !reader.ReadVarInt62(&object.object_id)) {
     return 0;
   }
-  if (!reader.ReadUInt8(&object.publisher_priority)) {
+  if (type != MoqtDataStreamType::kStreamHeaderFetch &&
+      !reader.ReadUInt8(&object.publisher_priority)) {
     return 0;
   }
   uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
@@ -99,14 +102,28 @@
                             MoqtDataStreamType type) {
   switch (type) {
     case MoqtDataStreamType::kStreamHeaderTrack:
+    case MoqtDataStreamType::kStreamHeaderFetch:
       if (!reader.ReadVarInt62(&object.group_id)) {
         return 0;
       }
+      if (type == MoqtDataStreamType::kStreamHeaderFetch) {
+        uint64_t value;
+        if (!reader.ReadVarInt62(&value)) {
+          return 0;
+        }
+        object.subgroup_id = value;
+      }
       [[fallthrough]];
 
     case MoqtDataStreamType::kStreamHeaderSubgroup: {
-      if (!reader.ReadVarInt62(&object.object_id) ||
-          !reader.ReadVarInt62(&object.payload_length)) {
+      if (!reader.ReadVarInt62(&object.object_id)) {
+        return 0;
+      }
+      if (type == MoqtDataStreamType::kStreamHeaderFetch &&
+          !reader.ReadUInt8(&object.publisher_priority)) {
+        return 0;
+      }
+      if (!reader.ReadVarInt62(&object.payload_length)) {
         return 0;
       }
       uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 3e6a0d9..a7c7c74 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -60,7 +60,9 @@
 };
 constexpr std::array kDataStreamTypes{
     MoqtDataStreamType::kStreamHeaderTrack,
-    MoqtDataStreamType::kStreamHeaderSubgroup};
+    MoqtDataStreamType::kStreamHeaderSubgroup,
+    MoqtDataStreamType::kStreamHeaderFetch,
+};
 
 using GeneralizedMessageType =
     absl::variant<MoqtMessageType, MoqtDataStreamType>;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 3236bae..7c52c68 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -1288,7 +1288,9 @@
   header.payload_length = object.payload.length();
 
   quiche::QuicheBuffer serialized_header =
-      session_->framer_.SerializeObjectHeader(header, !stream_header_written_);
+      session_->framer_.SerializeObjectHeader(
+          header, GetMessageTypeForForwardingPreference(forwarding_preference),
+          !stream_header_written_);
   bool fin = false;
   switch (forwarding_preference) {
     case MoqtForwardingPreference::kTrack:
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 1138024..c26261f 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -326,6 +326,56 @@
   };
 };
 
+class QUICHE_NO_EXPORT StreamHeaderFetchMessage : public ObjectMessage {
+ public:
+  StreamHeaderFetchMessage() : ObjectMessage() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.subgroup_id = 8;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvvvv-v---", false); }
+
+  bool SetPayloadLength(uint8_t payload_length) {
+    if (payload_length > 63) {
+      // This only supports one-byte varints.
+      return false;
+    }
+    object_.payload_length = payload_length;
+    raw_packet_[6] = payload_length;
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    return true;
+  }
+
+ private:
+  uint8_t raw_packet_[10] = {
+      0x05,                    // type field
+      0x04,                    // subscribe ID
+                               // object middler:
+      0x05, 0x08, 0x06,        // sequence
+      0x07,                    // publisher priority
+      0x03, 0x66, 0x6f, 0x6f,  // payload = "foo"
+  };
+};
+
+// Used only for tests that process multiple objects on one stream.
+class QUICHE_NO_EXPORT StreamMiddlerFetchMessage : public ObjectMessage {
+ public:
+  StreamMiddlerFetchMessage() : ObjectMessage() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kTrack;
+    object_.subgroup_id = 8;
+    object_.object_id = 9;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv-v---", false); }
+
+ private:
+  uint8_t raw_packet_[8] = {
+      0x05, 0x08, 0x09, 0x07,  // Object metadata
+      0x03, 0x62, 0x61, 0x72,  // Payload = "bar"
+  };
+};
+
 class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase {
  public:
   explicit ClientSetupMessage(bool webtrans) : TestMessageBase() {
@@ -1623,6 +1673,8 @@
       return std::make_unique<StreamHeaderTrackMessage>();
     case MoqtDataStreamType::kStreamHeaderSubgroup:
       return std::make_unique<StreamHeaderSubgroupMessage>();
+    case MoqtDataStreamType::kStreamHeaderFetch:
+      return std::make_unique<StreamHeaderFetchMessage>();
     case MoqtDataStreamType::kPadding:
       return nullptr;
   }