Implement FETCH object serialization flags.

Datagrams now no longer have a subgroup_id, so most Object data structures make the subgroup std::optional.

FETCH no longer returns non-normal objects.

Due to worsening dependency loops, added moqt_types.h, which will hopefully expand to cover many simple data types.

Concludes MOQT draft-16 update except for filed bugs.

PiperOrigin-RevId: 879779659
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 4c3283f..58ed467 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1601,6 +1601,7 @@
     "quic/moqt/moqt_subscribe_windows.h",
     "quic/moqt/moqt_trace_recorder.h",
     "quic/moqt/moqt_track.h",
+    "quic/moqt/moqt_types.h",
     "quic/moqt/relay_namespace_tree.h",
     "quic/moqt/session_namespace_tree.h",
     "quic/moqt/tools/chat_client.h",
diff --git a/build/source_list.gni b/build/source_list.gni
index f4b0dd1..42458e6 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1605,6 +1605,7 @@
     "src/quiche/quic/moqt/moqt_subscribe_windows.h",
     "src/quiche/quic/moqt/moqt_trace_recorder.h",
     "src/quiche/quic/moqt/moqt_track.h",
+    "src/quiche/quic/moqt/moqt_types.h",
     "src/quiche/quic/moqt/relay_namespace_tree.h",
     "src/quiche/quic/moqt/session_namespace_tree.h",
     "src/quiche/quic/moqt/tools/chat_client.h",
diff --git a/build/source_list.json b/build/source_list.json
index 66a8052..86a6e76 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1604,6 +1604,7 @@
     "quiche/quic/moqt/moqt_subscribe_windows.h",
     "quiche/quic/moqt/moqt_trace_recorder.h",
     "quiche/quic/moqt/moqt_track.h",
+    "quiche/quic/moqt/moqt_types.h",
     "quiche/quic/moqt/relay_namespace_tree.h",
     "quiche/quic/moqt/session_namespace_tree.h",
     "quiche/quic/moqt/tools/chat_client.h",
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index bded8ff..740d30f 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -24,7 +24,9 @@
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
+#include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_buffer_allocator.h"
@@ -355,72 +357,93 @@
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
     const MoqtObject& message, MoqtDataStreamType message_type,
-    std::optional<uint64_t> previous_object_in_stream) {
-  if (!ValidateObjectMetadata(message, /*is_datagram=*/false)) {
+    std::optional<PublishedObjectMetadata>& previous_object_in_stream) {
+  if (!ValidateObjectMetadata(message)) {
     QUICHE_BUG(QUICHE_BUG_serialize_object_header_01)
         << "Object metadata is invalid";
     return quiche::QuicheBuffer();
   }
+  // Many fields are optional because the stream type or Fetch serialization
+  // omits them.
+  std::optional<uint64_t> stream_type;
+  std::optional<uint64_t> track_id;  // Track alias or FETCH ID.
+  std::optional<uint64_t> group_id;
+  std::optional<uint64_t> subgroup_id;
+  std::optional<uint64_t> object_id;
+  std::optional<uint8_t> publisher_priority;
+  std::optional<absl::string_view> extension_headers;
+  uint64_t payload_length = message.payload_length;
   bool is_first_in_stream = !previous_object_in_stream.has_value();
-  // Not all fields will be written to the wire. Keep optional ones in
-  // std::optional so that they can be excluded.
-  // Three fields are always optional.
-  std::optional<uint64_t> stream_type =
-      is_first_in_stream ? std::optional<uint64_t>(message_type.value())
-                         : std::nullopt;
-  std::optional<uint64_t> track_alias =
-      is_first_in_stream ? std::optional<uint64_t>(message.track_alias)
-                         : std::nullopt;
-  std::optional<uint64_t> object_status =
-      (message.payload_length == 0)
-          ? std::optional<uint64_t>(
-                static_cast<uint64_t>(message.object_status))
-          : std::nullopt;
+  if (is_first_in_stream) {
+    stream_type = message_type.value();
+    track_id = message.track_alias;
+  }
   if (message_type.IsFetch()) {
+    MoqtFetchSerialization serialization;
+    if (is_first_in_stream) {
+      serialization = MoqtFetchSerialization(message);
+    } else {
+      serialization =
+          MoqtFetchSerialization(message, *previous_object_in_stream);
+    }
+    if (serialization.has_group_id()) {
+      group_id = message.group_id;
+    }
+    if (serialization.has_subgroup_id()) {
+      subgroup_id = message.subgroup_id;
+    }
+    if (serialization.has_object_id()) {
+      object_id = message.object_id;
+    }
+    if (serialization.has_priority()) {
+      publisher_priority = message.publisher_priority;
+    }
+    if (serialization.has_extensions()) {
+      extension_headers = message.extension_headers;
+    }
     return Serialize(
         WireOptional<WireVarInt62>(stream_type),
-        WireOptional<WireVarInt62>(track_alias), WireVarInt62(message.group_id),
-        WireVarInt62(message.subgroup_id), WireVarInt62(message.object_id),
-        WireUint8(message.publisher_priority),
-        WireStringWithVarInt62Length(message.extension_headers),
-        WireVarInt62(message.payload_length),
-        WireOptional<WireVarInt62>(object_status));
+        WireOptional<WireVarInt62>(track_id),
+        WireVarInt62(serialization.value()),
+        WireOptional<WireVarInt62>(group_id),
+        WireOptional<WireVarInt62>(subgroup_id),
+        WireOptional<WireVarInt62>(object_id),
+        WireOptional<WireUint8>(publisher_priority),
+        WireOptional<WireStringWithVarInt62Length>(extension_headers),
+        WireVarInt62(payload_length));
   }
-  if (previous_object_in_stream.has_value() &&
-      message.object_id <= *previous_object_in_stream) {
+  // Subgroup stream.
+  if (!message.subgroup_id.has_value()) {
     QUICHE_BUG(QUICHE_BUG_serialize_object_header_02)
-        << "Object ID is not increasing";
+        << "Subgroup ID is missing";
     return quiche::QuicheBuffer();
   }
-  // Subgroup headers have more optional fields.
-  QUICHE_CHECK(message_type.IsSubgroup());
-  std::optional<uint64_t> group_id =
-      previous_object_in_stream.has_value()
-          ? std::nullopt
-          : std::optional<uint64_t>(message.group_id);
-  uint64_t object_id = message.object_id;
-  if (!is_first_in_stream) {
-    // The value is actually an object ID delta, not the absolute object ID.
-    object_id -= (*previous_object_in_stream + 1);
+  if (is_first_in_stream) {
+    group_id = message.group_id;
+    if (message_type.IsSubgroupPresent()) {
+      subgroup_id = message.subgroup_id;
+    }
+    if (!message_type.HasDefaultPriority()) {
+      publisher_priority = message.publisher_priority;
+    }
   }
-  std::optional<uint64_t> subgroup_id =
-      (is_first_in_stream && message_type.IsSubgroupPresent())
-          ? std::optional<uint64_t>(message.subgroup_id)
-          : std::nullopt;
-  std::optional<uint8_t> publisher_priority =
-      (is_first_in_stream && !message_type.HasDefaultPriority())
-          ? std::optional<uint8_t>(message.publisher_priority)
-          : std::nullopt;
-  std::optional<absl::string_view> extension_headers =
-      (message_type.AreExtensionHeadersPresent())
-          ? std::optional<absl::string_view>(message.extension_headers)
-          : std::nullopt;
+  object_id = message.object_id;
+  if (!is_first_in_stream) {
+    *object_id -= (previous_object_in_stream->location.object + 1);
+  }
+  if (message_type.AreExtensionHeadersPresent()) {
+    extension_headers = message.extension_headers;
+  }
+  std::optional<uint64_t> object_status;
+  if (payload_length == 0) {
+    object_status = static_cast<uint64_t>(message.object_status);
+  }
   return Serialize(
       WireOptional<WireVarInt62>(stream_type),
-      WireOptional<WireVarInt62>(track_alias),
+      WireOptional<WireVarInt62>(track_id),
       WireOptional<WireVarInt62>(group_id),
       WireOptional<WireVarInt62>(subgroup_id),
-      WireOptional<WireUint8>(publisher_priority), WireVarInt62(object_id),
+      WireOptional<WireUint8>(publisher_priority), WireVarInt62(*object_id),
       WireOptional<WireStringWithVarInt62Length>(extension_headers),
       WireVarInt62(message.payload_length),
       WireOptional<WireVarInt62>(object_status));
@@ -429,7 +452,7 @@
 quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
     const MoqtObject& message, absl::string_view payload,
     MoqtPriority default_priority) {
-  if (!ValidateObjectMetadata(message, /*is_datagram=*/true)) {
+  if (!ValidateObjectMetadata(message) || message.subgroup_id.has_value()) {
     QUICHE_BUG(QUICHE_BUG_serialize_object_datagram_01)
         << "Object metadata is invalid";
     return quiche::QuicheBuffer();
@@ -723,17 +746,10 @@
 }
 
 // static
-bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object,
-                                        bool is_datagram) {
-  if (object.object_status != MoqtObjectStatus::kNormal &&
-      object.object_status != MoqtObjectStatus::kEndOfGroup &&
-      object.payload_length > 0) {
-    return false;
-  }
-  if (is_datagram && object.subgroup_id != object.object_id) {
-    return false;
-  }
-  return true;
+bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) {
+  return (object.object_status == MoqtObjectStatus::kNormal ||
+          object.object_status == MoqtObjectStatus::kEndOfGroup ||
+          object.payload_length == 0);
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 62ae609..f1e56e0 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -5,12 +5,12 @@
 #ifndef QUICHE_QUIC_MOQT_MOQT_FRAMER_H_
 #define QUICHE_QUIC_MOQT_MOQT_FRAMER_H_
 
-#include <cstdint>
 #include <optional>
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/quiche_buffer_allocator.h"
@@ -37,7 +37,7 @@
   // one otherwise.
   quiche::QuicheBuffer SerializeObjectHeader(
       const MoqtObject& message, MoqtDataStreamType message_type,
-      std::optional<uint64_t> previous_object_in_stream);
+      std::optional<PublishedObjectMetadata>& previous_object_in_stream);
   // Serializes both OBJECT and OBJECT_STATUS datagrams.
   quiche::QuicheBuffer SerializeObjectDatagram(const MoqtObject& message,
                                                absl::string_view payload,
@@ -86,8 +86,7 @@
                                       const SetupParameters& parameters,
                                       KeyValuePairList& out);
   // Returns true if the metadata is internally consistent.
-  static bool ValidateObjectMetadata(const MoqtObject& object,
-                                     bool is_datagram);
+  static bool ValidateObjectMetadata(const MoqtObject& object);
   const bool using_webtrans_;
 };
 
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index f9aa333..d63e2b6 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -16,7 +16,8 @@
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
-#include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_object.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
 #include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
@@ -91,11 +92,18 @@
   MoqtObject adjusted_message = message;
   adjusted_message.payload_length = payload.size();
   QUICHE_DCHECK(message.object_id > change_in_object_id);
+  std::optional<PublishedObjectMetadata> previous_object;
+  if (change_in_object_id > 0) {
+    previous_object.emplace();
+    previous_object->location =
+        Location(message.group_id, message.object_id - change_in_object_id);
+    previous_object->subgroup = message.subgroup_id;
+    previous_object->extensions = message.extension_headers;
+    previous_object->status = message.object_status;
+    previous_object->publisher_priority = message.publisher_priority;
+  }
   quiche::QuicheBuffer header = framer.SerializeObjectHeader(
-      adjusted_message, stream_type,
-      change_in_object_id == 0
-          ? std::nullopt
-          : std::optional<uint64_t>(message.object_id - change_in_object_id));
+      adjusted_message, stream_type, previous_object);
   if (header.empty()) {
     return quiche::QuicheBuffer();
   }
@@ -269,43 +277,58 @@
 }
 
 TEST_F(MoqtFramerSimpleTest, FetchMiddler) {
-  auto header = std::make_unique<StreamHeaderFetchMessage>();
-  auto buffer1 =
-      SerializeObject(framer_, std::get<MoqtObject>(header->structured_data()),
-                      "foo", MoqtDataStreamType::Fetch(), 0);
-  EXPECT_EQ(buffer1.size(), header->total_message_size());
-  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+  for (const MoqtFetchSerialization& flags : AllMoqtFetchSerializations()) {
+    SCOPED_TRACE(testing::Message() << "flags: " << flags.value());
+    if (flags.is_datagram() && !flags.zero_subgroup_id()) {
+      // The framer will not encode these, although they are legal.
+      continue;
+    }
+    auto header = std::make_unique<StreamHeaderFetchMessage>();
+    MoqtObject object = std::get<MoqtObject>(header->structured_data());
+    std::optional<PublishedObjectMetadata> previous;
+    auto buffer1 = framer_.SerializeObjectHeader(
+        object, MoqtDataStreamType::Fetch(), previous);
+    auto whole_object =
+        quiche::QuicheBuffer::Copy(quiche::SimpleBufferAllocator::Get(),
+                                   absl::StrCat(buffer1.AsStringView(), "foo"));
+    EXPECT_EQ(whole_object.size(), header->total_message_size());
+    EXPECT_EQ(whole_object.AsStringView(), header->PacketSample());
 
-  auto middler = std::make_unique<StreamMiddlerFetchMessage>();
-  auto buffer2 =
-      SerializeObject(framer_, std::get<MoqtObject>(middler->structured_data()),
-                      "bar", MoqtDataStreamType::Fetch(), 3);
-  EXPECT_EQ(buffer2.size(), middler->total_message_size());
-  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+    auto middler = std::make_unique<StreamMiddlerFetchMessage>(flags);
+    // Populate previous object metadata.
+    previous.emplace(Location(object.group_id, object.object_id),
+                     object.subgroup_id, object.extension_headers,
+                     object.object_status, object.publisher_priority);
+    auto buffer2 = framer_.SerializeObjectHeader(
+        std::get<MoqtObject>(middler->structured_data()),
+        MoqtDataStreamType::Fetch(), previous);
+    whole_object =
+        quiche::QuicheBuffer::Copy(quiche::SimpleBufferAllocator::Get(),
+                                   absl::StrCat(buffer2.AsStringView(), "bar"));
+    EXPECT_EQ(whole_object.size(), middler->total_message_size());
+    EXPECT_EQ(whole_object.AsStringView(), middler->PacketSample());
+  }
 }
 
 TEST_F(MoqtFramerSimpleTest, BadObjectInput) {
   MoqtObject object = {
-      // This is a valid object.
+      // Invalid: DoesNotExist with non-zero payload length.
       /*track_alias=*/4,
       /*group_id=*/5,
       /*object_id=*/6,
       /*publisher_priority=*/7,
       std::string(kDefaultExtensionBlob.data(), kDefaultExtensionBlob.size()),
-      /*object_status=*/MoqtObjectStatus::kNormal,
+      /*object_status=*/MoqtObjectStatus::kObjectDoesNotExist,
       /*subgroup_id=*/8,
       /*payload_length=*/3,
   };
   quiche::QuicheBuffer buffer;
-
-  // Non-normal status must have no payload.
-  object.object_status = MoqtObjectStatus::kObjectDoesNotExist;
+  std::optional<PublishedObjectMetadata> previous;
   EXPECT_QUIC_BUG(
       buffer = framer_.SerializeObjectHeader(
-          object, MoqtDataStreamType::Subgroup(8, 0, false, false), false),
+          object, MoqtDataStreamType::Subgroup(8, 0, false, false), previous),
       "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
-  // object.object_status = MoqtObjectStatus::kNormal;
 }
 
 TEST_F(MoqtFramerSimpleTest, BadDatagramInput) {
@@ -317,7 +340,7 @@
       /*publisher_priority=*/7,
       std::string(kDefaultExtensionBlob),
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*subgroup_id=*/6,
+      /*subgroup_id=*/std::nullopt,
       /*payload_length=*/3,
   };
   quiche::QuicheBuffer buffer;
@@ -334,7 +357,7 @@
                       object, "foo", kDefaultPublisherPriority),
                   "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
-  object.subgroup_id = 6;
+  object.subgroup_id = std::nullopt;
 
   EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(
                       object, "foobar", kDefaultPublisherPriority),
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 526d54b..6202316 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -32,6 +32,7 @@
 #include "quiche/quic/moqt/moqt_session_callbacks.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
 #include "quiche/quic/moqt/test_tools/moqt_session_peer.h"
 #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
@@ -497,18 +498,12 @@
     }
     EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kSuccess);
     EXPECT_EQ(object.metadata.location, expected);
-    if (object.metadata.location.object == 1) {
-      EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kEndOfGroup);
-      expected.object = 0;
-      ++expected.group;
-    } else {
-      EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal);
-      EXPECT_EQ(object.payload.AsStringView(), "object");
-      ++expected.object;
-    }
+    EXPECT_EQ(object.metadata.status, MoqtObjectStatus::kNormal);
+    EXPECT_EQ(object.payload.AsStringView(), "object");
+    ++expected.group;
   } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess);
   EXPECT_EQ(result, MoqtFetchTask::GetNextObjectResult::kEof);
-  EXPECT_EQ(expected, Location(99, 1));
+  EXPECT_EQ(expected, Location(100, 0));
 }
 
 TEST_F(MoqtIntegrationTest, PublishNamespaceFailure) {
diff --git a/quiche/quic/moqt/moqt_key_value_pair.h b/quiche/quic/moqt/moqt_key_value_pair.h
index 09dac22..b0fc0f6 100644
--- a/quiche/quic/moqt/moqt_key_value_pair.h
+++ b/quiche/quic/moqt/moqt_key_value_pair.h
@@ -17,6 +17,7 @@
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_callbacks.h"
@@ -54,40 +55,6 @@
   absl::btree_multimap<uint64_t, std::variant<uint64_t, std::string>> map_;
 };
 
-inline constexpr uint64_t kMaxGroupId = quiche::kVarInt62MaxValue;
-inline constexpr uint64_t kMaxObjectId = quiche::kVarInt62MaxValue;
-// Location as defined in
-// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
-struct Location {
-  uint64_t group = 0;
-  uint64_t object = 0;
-
-  Location() = default;
-  Location(uint64_t group, uint64_t object) : group(group), object(object) {}
-
-  // Location order as described in
-  // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
-  auto operator<=>(const Location&) const = default;
-
-  Location Next() const {
-    if (object == kMaxObjectId) {
-      if (group == kMaxObjectId) {
-        return Location(0, 0);
-      }
-      return Location(group + 1, 0);
-    }
-    return Location(group, object + 1);
-  }
-
-  template <typename H>
-  friend H AbslHashValue(H h, const Location& m);
-
-  template <typename Sink>
-  friend void AbslStringify(Sink& sink, const Location& sequence) {
-    absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object);
-  }
-};
-
 enum AuthTokenType : uint64_t {
   kOutOfBand = 0x0,
 
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 3e75e58..013b1f2 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -164,6 +164,10 @@
                       type.has_extension() ? "_EXTENSION" : "");
 }
 
+std::string MoqtFetchSerializationToString(MoqtFetchSerialization type) {
+  return absl::StrCat("FETCH_SERIALIZATION_", type.value());
+}
+
 std::string MoqtForwardingPreferenceToString(
     MoqtForwardingPreference preference) {
   switch (preference) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index fdb35e5..c956c8f 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -8,6 +8,7 @@
 #define QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
 
 #include <algorithm>
+#include <bit>
 #include <cstddef>
 #include <cstdint>
 #include <initializer_list>
@@ -17,7 +18,6 @@
 #include <variant>
 #include <vector>
 
-#include "absl/strings/str_format.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
@@ -25,9 +25,11 @@
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_names.h"
+#include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_export.h"
-#include "quiche/common/platform/api/quiche_logging.h"
+#include "quiche/common/quiche_endian.h"
 
 namespace moqt {
 
@@ -99,6 +101,7 @@
   static constexpr uint64_t kFirstObjectId = 0x02;
   static constexpr uint64_t kSubgroupId = 0x04;
 
+  MoqtDataStreamType() : value_(0) {}
   // Factory functions.
   static std::optional<MoqtDataStreamType> FromValue(uint64_t value) {
     MoqtDataStreamType stream_type(value);
@@ -169,11 +172,12 @@
   }
 
   uint64_t value() const { return value_; }
+  MoqtDataStreamType& operator=(const MoqtDataStreamType& other) = default;
   bool operator==(const MoqtDataStreamType& other) const = default;
 
  private:
   explicit MoqtDataStreamType(uint64_t value) : value_(value) {}
-  const uint64_t value_;
+  uint64_t value_;
 };
 
 class QUICHE_EXPORT MoqtDatagramType {
@@ -301,11 +305,6 @@
   auto operator<=>(const SubgroupPriority&) const = default;
 };
 
-template <typename H>
-H AbslHashValue(H h, const Location& m) {
-  return H::combine(std::move(h), m.group, m.object);
-}
-
 // TODO(martinduke): Collapse both Setup messages into SetupParameters.
 struct QUICHE_EXPORT MoqtClientSetup {
   SetupParameters parameters;
@@ -321,14 +320,6 @@
   kDatagram,
 };
 
-enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t {
-  kNormal = 0x0,
-  kObjectDoesNotExist = 0x1,
-  kEndOfGroup = 0x3,
-  kEndOfTrack = 0x4,
-  kInvalidObjectStatus = 0x5,
-};
-
 MoqtObjectStatus IntegerToObjectStatus(uint64_t integer);
 
 // The data contained in every Object message, although the message type
@@ -340,10 +331,114 @@
   MoqtPriority publisher_priority;
   std::string extension_headers;  // Raw, unparsed extension headers.
   MoqtObjectStatus object_status;
-  uint64_t subgroup_id;
+  std::optional<uint64_t> subgroup_id;  // Only for subgroup objects.
   uint64_t payload_length;
 };
 
+class QUICHE_EXPORT MoqtFetchSerialization {
+ public:
+  static constexpr uint64_t kSubgroupIdMask = 0x03;
+  static constexpr uint64_t kSubgroupIdZero = 0x00;
+  static constexpr uint64_t kPriorSubgroupId = 0x01;
+  static constexpr uint64_t kPriorSubgroupIdPlusOne = 0x02;
+  static constexpr uint64_t kHasSubgroupId = 0x03;
+  static constexpr uint64_t kHasObjectId = 0x04;
+  static constexpr uint64_t kHasGroupId = 0x08;
+  static constexpr uint64_t kHasPriority = 0x10;
+  static constexpr uint64_t kHasExtensions = 0x20;
+  static constexpr uint64_t kIsDatagram = 0x40;
+
+  static constexpr uint64_t kMaxFetchSerialization =
+      kIsDatagram | kHasExtensions | kHasPriority | kHasGroupId | kHasObjectId |
+      kSubgroupIdMask;
+
+  static constexpr uint64_t kEndOfNonExistentRange = 0x8c;
+  static constexpr uint64_t kEndOfUnknownRange = 0x10c;
+
+  MoqtFetchSerialization() = default;
+  // Serialization for the first object in a stream.
+  MoqtFetchSerialization(const MoqtObject& object) {
+    if (!object.subgroup_id.has_value()) {
+      value_ |= kIsDatagram;
+    } else {
+      if (*object.subgroup_id == 0) {
+        value_ |= kSubgroupIdZero;
+      } else {
+        value_ |= kHasSubgroupId;
+      }
+    }
+    value_ |= (kHasGroupId | kHasObjectId | kHasPriority);
+    if (!object.extension_headers.empty()) {
+      value_ |= kHasExtensions;
+    }
+  }
+  // Serialization for a subsequent object in a stream.
+  MoqtFetchSerialization(const MoqtObject& object,
+                         const PublishedObjectMetadata& previous_object) {
+    uint64_t value = 0;
+    if (!object.subgroup_id.has_value()) {
+      value |= kIsDatagram;
+    } else if (object.subgroup_id == 0) {
+      value |= kSubgroupIdZero;
+    } else if (!previous_object.subgroup.has_value()) {
+      value |= kHasSubgroupId;  // Can't use previous value.
+    } else if (object.subgroup_id == previous_object.subgroup) {
+      value |= kPriorSubgroupId;
+    } else if (*object.subgroup_id == *previous_object.subgroup + 1) {
+      value |= kPriorSubgroupIdPlusOne;
+    } else {
+      value |= kHasSubgroupId;
+    }
+    if (object.object_id != previous_object.location.object + 1) {
+      value |= kHasObjectId;
+    }
+    if (object.group_id != previous_object.location.group) {
+      value |= kHasGroupId;
+    }
+    if (object.publisher_priority != previous_object.publisher_priority) {
+      value |= kHasPriority;
+    }
+    if (!object.extension_headers.empty()) {
+      value |= kHasExtensions;
+    }
+    value_ = value;
+  }
+  static std::optional<MoqtFetchSerialization> FromValue(uint64_t value) {
+    if (value > kMaxFetchSerialization && value != kEndOfUnknownRange &&
+        value != kEndOfNonExistentRange) {
+      return std::nullopt;
+    }
+    return MoqtFetchSerialization(value);
+  }
+  bool has_subgroup_id() const {
+    return ((value_ & kSubgroupIdMask) == kHasSubgroupId) && !is_datagram();
+  }
+  bool zero_subgroup_id() const {
+    return (value_ & kSubgroupIdMask) == kSubgroupIdZero && !is_datagram();
+  }
+  bool prior_subgroup_id() const {
+    return (value_ & kSubgroupIdMask) == kPriorSubgroupId && !is_datagram();
+  }
+  bool prior_subgroup_id_plus_one() const {
+    return (value_ & kSubgroupIdMask) == kPriorSubgroupIdPlusOne &&
+           !is_datagram();
+  }
+  bool has_object_id() const { return value_ & kHasObjectId; }
+  bool has_group_id() const { return value_ & kHasGroupId; }
+  bool has_priority() const { return value_ & kHasPriority; }
+  bool has_extensions() const { return value_ & kHasExtensions; }
+  bool is_datagram() const { return value_ & kIsDatagram; }
+  bool end_of_non_existent_range() const {
+    return value_ == kEndOfNonExistentRange;
+  }
+  bool end_of_unknown_range() const { return value_ == kEndOfUnknownRange; }
+  uint64_t value() const { return value_; }
+
+ private:
+  MoqtFetchSerialization(uint64_t value) : value_(value) {}
+  uint64_t value_ = 0;
+};
+
 struct QUICHE_EXPORT MoqtRequestError {
   uint64_t request_id;
   RequestErrorCode error_code;
@@ -534,6 +629,7 @@
 
 std::string MoqtMessageTypeToString(MoqtMessageType message_type);
 std::string MoqtDataStreamTypeToString(MoqtDataStreamType type);
+std::string MoqtFetchSerializationToString(MoqtFetchSerialization type);
 std::string MoqtDatagramTypeToString(MoqtDatagramType type);
 
 std::string MoqtForwardingPreferenceToString(
diff --git a/quiche/quic/moqt/moqt_object.h b/quiche/quic/moqt/moqt_object.h
index e708e46..87469d9 100644
--- a/quiche/quic/moqt/moqt_object.h
+++ b/quiche/quic/moqt/moqt_object.h
@@ -7,29 +7,28 @@
 
 #include <cstdint>
 #include <memory>
+#include <optional>
 #include <string>
 
 #include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/quiche_mem_slice.h"
 
 namespace moqt {
 
 struct PublishedObjectMetadata {
   Location location;
-  uint64_t subgroup;  // Equal to object_id for datagrams.
+  std::optional<uint64_t> subgroup;  // nullopt for datagrams.
   std::string extensions;
   MoqtObjectStatus status;
   MoqtPriority publisher_priority;
-  MoqtForwardingPreference forwarding_preference;
   quic::QuicTime arrival_time = quic::QuicTime::Zero();
   bool IsMalformed(const PublishedObjectMetadata& other) const {
     // It's OK for arrival_time to be different when checking immutables.
     return (location != other.location || subgroup != other.subgroup ||
             status != other.status ||
-            publisher_priority != other.publisher_priority ||
-            forwarding_preference != other.forwarding_preference);
+            publisher_priority != other.publisher_priority);
   }
 };
 
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.cc b/quiche/quic/moqt/moqt_outgoing_queue.cc
index d03954b..00cbc1c 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue.cc
@@ -14,12 +14,11 @@
 #include "absl/algorithm/container.h"
 #include "absl/status/status.h"
 #include "quiche/quic/moqt/moqt_fetch_task.h"
-#include "quiche/quic/moqt/moqt_key_value_pair.h"
-#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/quiche_mem_slice.h"
 
@@ -77,29 +76,20 @@
   Location sequence{current_group_id_, queue_.back().size()};
   bool fin = status == MoqtObjectStatus::kEndOfGroup;
   queue_.back().push_back(CachedObject{
-      PublishedObjectMetadata{
-          sequence, 0, "", status, default_publisher_priority(),
-          MoqtForwardingPreference::kSubgroup, clock_->ApproximateNow()},
+      PublishedObjectMetadata{sequence, 0, "", status,
+                              default_publisher_priority(),
+                              clock_->ApproximateNow()},
       std::make_shared<quiche::QuicheMemSlice>(std::move(payload)), fin});
   for (MoqtObjectListener* listener : listeners_) {
     listener->OnNewObjectAvailable(sequence, /*subgroup=*/0,
-                                   default_publisher_priority(),
-                                   MoqtForwardingPreference::kSubgroup);
+                                   default_publisher_priority());
   }
 }
 
 std::optional<PublishedObject> MoqtOutgoingQueue::GetCachedObject(
-    uint64_t group, uint64_t subgroup, uint64_t object) const {
-  QUICHE_DCHECK_EQ(subgroup, 0u);
+    uint64_t group, std::optional<uint64_t> subgroup, uint64_t object) const {
+  QUICHE_DCHECK(subgroup.has_value() && subgroup == 0u);
   if (group < first_group_in_queue()) {
-    if (object == 0) {
-      return PublishedObject{
-          PublishedObjectMetadata{
-              Location(group, object), /*subgroup=*/0, "",
-              MoqtObjectStatus::kEndOfGroup, default_publisher_priority(),
-              MoqtForwardingPreference::kSubgroup, clock_->ApproximateNow()},
-          quiche::QuicheMemSlice{}};
-    }
     return std::nullopt;
   }
   if (group > current_group_id_) {
@@ -211,45 +201,20 @@
 
 MoqtFetchTask::GetNextObjectResult MoqtOutgoingQueue::FetchTask::GetNextObject(
     PublishedObject& object) {
-  MoqtFetchTask::GetNextObjectResult result;
-  do {
-    result = GetNextObjectInner(object);
-    // The specification for FETCH requires that all missing objects are simply
-    // skipped.
-  } while (result == MoqtFetchTask::GetNextObjectResult::kSuccess &&
-           object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist);
-  return result;
-}
-
-MoqtFetchTask::GetNextObjectResult
-MoqtOutgoingQueue::FetchTask::GetNextObjectInner(PublishedObject& object) {
   if (!status_.ok()) {
     return kError;
   }
-  if (objects_.empty()) {
-    return kEof;
+  while (!objects_.empty()) {
+    std::optional<PublishedObject> new_object = queue_->GetCachedObject(
+        objects_.front().group, 0, objects_.front().object);
+    objects_.pop_front();
+    if (new_object.has_value() &&
+        new_object->metadata.status == MoqtObjectStatus::kNormal) {
+      object = *std::move(new_object);
+      return kSuccess;
+    }
   }
-
-  std::optional<PublishedObject> result = queue_->GetCachedObject(
-      objects_.front().group, 0, objects_.front().object);
-  if (!result.has_value()) {
-    // Create a synthetic object of status kEndOfGroup (if the object ID is
-    // zero) or kObjectDoesNotExist, which will result in the Fetch response
-    // skipping it.
-    object.metadata.location = objects_.front();
-    object.metadata.subgroup = 0;
-    object.metadata.publisher_priority = queue_->default_publisher_priority();
-    object.metadata.status = object.metadata.location.object == 0
-                                 ? MoqtObjectStatus::kEndOfGroup
-                                 : MoqtObjectStatus::kObjectDoesNotExist;
-    object.metadata.arrival_time = queue_->clock_->ApproximateNow();
-    object.payload = quiche::QuicheMemSlice();
-    object.fin_after_this = false;
-  } else {
-    object = *std::move(result);
-  }
-  objects_.pop_front();
-  return kSuccess;
+  return kEof;
 }
 
 void MoqtOutgoingQueue::Close() {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 7dbc7ee..d1dca0f 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -27,6 +27,7 @@
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/quiche_circular_deque.h"
 #include "quiche/common/quiche_mem_slice.h"
 
@@ -57,7 +58,8 @@
   // MoqtTrackPublisher implementation.
   const FullTrackName& GetTrackName() const override { return track_; }
   std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, uint64_t subgroup, uint64_t min_object) const override;
+      uint64_t group, std::optional<uint64_t> subgroup,
+      uint64_t min_object) const override;
   void AddObjectListener(MoqtObjectListener* listener) override {
     listeners_.insert(listener);
     listener->OnSubscribeAccepted();
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 333961c..dd79dcb 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -24,6 +24,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
 #include "quiche/common/platform/api/quiche_expect_bug.h"
 #include "quiche/common/platform/api/quiche_test.h"
@@ -49,11 +50,10 @@
     AddObjectListener(this);
   }
 
-  void OnNewObjectAvailable(
-      Location sequence, uint64_t subgroup, MoqtPriority publisher_priority,
-      MoqtForwardingPreference forwarding_preference) override {
+  void OnNewObjectAvailable(Location sequence, std::optional<uint64_t> subgroup,
+                            MoqtPriority publisher_priority) override {
     // MoqtOutgoingQueue does not create datagrams.
-    ASSERT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup);
+    ASSERT_THAT(subgroup, testing::Optional(0));
     std::optional<PublishedObject> object =
         GetCachedObject(sequence.group, subgroup, sequence.object);
     ASSERT_THAT(object,
@@ -79,8 +79,7 @@
         GetCachedObjectsInRange(Location(0, 0), *largest_location());
     for (Location object : objects) {
       if (window.InWindow(object)) {
-        OnNewObjectAvailable(object, 0, default_publisher_priority(),
-                             MoqtForwardingPreference::kSubgroup);
+        OnNewObjectAvailable(object, 0, default_publisher_priority());
       }
     }
   }
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index e1c663b..3a874e1 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -30,6 +30,7 @@
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_data_reader.h"
@@ -1146,7 +1147,7 @@
   } else {
     object_metadata.object_id = 0;
   }
-  object_metadata.subgroup_id = object_metadata.object_id;
+  object_metadata.subgroup_id = std::nullopt;
   use_default_priority = datagram_type->has_default_priority();
   if (!use_default_priority &&
       !reader.ReadUInt8(&object_metadata.publisher_priority)) {
@@ -1214,75 +1215,124 @@
   return absl::bit_cast<uint8_t>(buffer[0]);
 }
 
-void MoqtDataParser::AdvanceParserState() {
-  if (next_input_ != kStreamType && !type_.has_value()) {
-    QUICHE_BUG(quic_bug_advance_parser_state_no_type)
-        << "Advancing parser state without a stream type";
-    return;
+MoqtDataParser::NextInput MoqtDataParser::AdvanceParserState() {
+  if (type_.IsFetch()) {
+    switch (next_input_) {
+      case kStreamType:
+        return kRequestId;
+      case kRequestId:
+        return kSerializationFlags;
+      case kSerializationFlags:
+        if (fetch_serialization_.has_group_id()) {
+          return kGroupId;
+        }
+        [[fallthrough]];
+      case kGroupId:
+        if (fetch_serialization_.is_datagram()) {
+          metadata_.subgroup_id = std::nullopt;
+        } else {
+          if (fetch_serialization_.has_subgroup_id()) {
+            return kSubgroupId;
+          }
+          if (fetch_serialization_.prior_subgroup_id_plus_one()) {
+            if (!metadata_.subgroup_id.has_value()) {
+              ParseError("reference to subgroup ID of prior datagram");
+              return kFailed;
+            }
+            ++(*metadata_.subgroup_id);
+          } else if (fetch_serialization_.zero_subgroup_id()) {
+            metadata_.subgroup_id = 0;
+          } else if (!metadata_.subgroup_id.has_value()) {
+            QUICHE_DCHECK(fetch_serialization_.prior_subgroup_id());
+            ParseError("reference to subgroup ID of prior datagram");
+            return kFailed;
+          }
+        }
+        [[fallthrough]];
+      case kSubgroupId:
+        if (fetch_serialization_.has_object_id()) {
+          return kObjectId;
+        }
+        ++metadata_.object_id;
+        [[fallthrough]];
+      case kObjectId:
+        if (fetch_serialization_.end_of_non_existent_range() ||
+            fetch_serialization_.end_of_unknown_range()) {
+          return kSerializationFlags;
+        }
+        if (fetch_serialization_.has_priority()) {
+          return kPublisherPriority;
+        }
+        [[fallthrough]];
+      case kPublisherPriority:
+        if (fetch_serialization_.has_extensions()) {
+          return kExtensionSize;
+        }
+        metadata_.extension_headers = "";
+        return kObjectPayloadLength;
+      case kExtensionBody:
+        return kObjectPayloadLength;
+      case kData:
+        return kSerializationFlags;
+      case kTrackAlias:
+      case kObjectPayloadLength:
+      case kAwaitingNextByte:
+      case kStatus:
+      case kFailed:
+      case kExtensionSize:
+      case kPadding:
+        QUICHE_NOTREACHED();
+        return next_input_;
+    }
   }
   switch (next_input_) {
     // The state table is factored into a separate function (rather than
     // inlined) in order to separate the order of elements from the way they are
     // parsed.
     case kStreamType:
-      next_input_ = kTrackAlias;
-      break;
+      return kTrackAlias;
     case kTrackAlias:
-      next_input_ = kGroupId;
-      break;
+      return kGroupId;
     case kGroupId:
-      QUICHE_CHECK(type_.has_value());
-      if (type_->IsFetch() || type_->IsSubgroupPresent()) {
-        next_input_ = kSubgroupId;
-        break;
+      if (type_.IsSubgroupPresent()) {
+        return kSubgroupId;
       }
-      if (type_->SubgroupIsZero()) {
+      if (type_.SubgroupIsZero()) {
         metadata_.subgroup_id = 0;
       }
-      next_input_ =
-          type_->HasDefaultPriority() ? kObjectId : kPublisherPriority;
-      break;
+      [[fallthrough]];
     case kSubgroupId:
-      QUICHE_CHECK(type_.has_value());
-      next_input_ = (type_->IsFetch() || type_->HasDefaultPriority())
-                        ? kObjectId
-                        : kPublisherPriority;
-      break;
-    case kPublisherPriority:
-      QUICHE_CHECK(type_.has_value());
-      next_input_ = type_->IsFetch() ? kExtensionSize : kObjectId;
-      break;
-    case kObjectId:
-      QUICHE_CHECK(type_.has_value());
-      if (type_->HasDefaultPriority()) {
-        metadata_.publisher_priority = default_publisher_priority_;
+      if (!type_.HasDefaultPriority()) {
+        return kPublisherPriority;
       }
-      if (num_objects_read_ == 0 && type_->SubgroupIsFirstObjectId()) {
+      metadata_.publisher_priority = default_publisher_priority_;
+      [[fallthrough]];
+    case kPublisherPriority:
+      return kObjectId;
+    case kObjectId:
+      if (num_objects_read_ == 0 && type_.SubgroupIsFirstObjectId()) {
         metadata_.subgroup_id = metadata_.object_id;
       }
-      if (type_->IsFetch()) {
-        next_input_ = kPublisherPriority;
-      } else if (type_->AreExtensionHeadersPresent()) {
-        next_input_ = kExtensionSize;
-      } else {
-        next_input_ = kObjectPayloadLength;
+      if (type_.AreExtensionHeadersPresent()) {
+        return kExtensionSize;
       }
-      break;
+      [[fallthrough]];
     case kExtensionBody:
-      next_input_ = kObjectPayloadLength;
-      break;
+      return kObjectPayloadLength;
     case kStatus:
     case kData:
     case kAwaitingNextByte:
-      next_input_ = type_->IsFetch() ? kGroupId : kObjectId;
-      break;
-    case kExtensionSize:        // Either kExtensionBody or
-                                // kObjectPayloadLength.
-    case kObjectPayloadLength:  // Either kStatus or kData depending on length.
-    case kPadding:              // Handled separately.
-    case kFailed:               // Should cause parsing to cease.
+      return kObjectId;
+    case kRequestId:
+    case kSerializationFlags:
+    case kExtensionSize:
+    case kObjectPayloadLength:
+    case kPadding:
+    case kFailed:
+      // Other transitions are either Fetch-only or handled in
+      // ParseNextItemFromStream.
       QUICHE_NOTREACHED();
-      break;
+      return next_input_;
   }
 }
 
@@ -1302,23 +1352,48 @@
         ParseError("Invalid stream type supplied");
         return;
       }
-      type_.emplace(std::move(*type));
-      if (type_->IsPadding()) {
+      type_ = *type;
+      if (type_.IsPadding()) {
         next_input_ = kPadding;
         return;
       }
-      if (type_->EndOfGroupInStream()) {
+      if (type_.EndOfGroupInStream()) {
         contains_end_of_group_ = true;
       }
-      AdvanceParserState();
+      next_input_ = AdvanceParserState();
       return;
     }
 
+    case kRequestId:
     case kTrackAlias: {
       std::optional<uint64_t> value_read = ReadVarInt62NoFin();
       if (value_read.has_value()) {
         metadata_.track_alias = *value_read;
-        AdvanceParserState();
+        next_input_ = AdvanceParserState();
+      }
+      return;
+    }
+
+    case kSerializationFlags: {
+      std::optional<uint64_t> value_read = ReadVarInt62NoFin();
+      if (value_read.has_value()) {
+        std::optional<MoqtFetchSerialization> serialization =
+            MoqtFetchSerialization::FromValue(*value_read);
+        if (!serialization.has_value()) {
+          ParseError("Invalid serialization flags");
+          return;
+        }
+        if (num_objects_read_ == 0 &&
+            (serialization->prior_subgroup_id() ||
+             serialization->prior_subgroup_id_plus_one() ||
+             !serialization->has_object_id() ||
+             !serialization->has_group_id() ||
+             !serialization->has_priority())) {
+          ParseError("Invalid serialization flags for first object");
+          return;
+        }
+        fetch_serialization_ = *serialization;
+        next_input_ = AdvanceParserState();
       }
       return;
     }
@@ -1326,8 +1401,14 @@
     case kGroupId: {
       std::optional<uint64_t> value_read = ReadVarInt62NoFin();
       if (value_read.has_value()) {
-        metadata_.group_id = *value_read;
-        AdvanceParserState();
+        if (type_.IsFetch() ||
+            !fetch_serialization_.end_of_non_existent_range() ||
+            !fetch_serialization_.end_of_unknown_range()) {
+          // Do not record range indicator group IDs because it will corrupt
+          // references to the previous object.
+          metadata_.group_id = *value_read;
+        }
+        next_input_ = AdvanceParserState();
       }
       return;
     }
@@ -1336,7 +1417,7 @@
       std::optional<uint64_t> value_read = ReadVarInt62NoFin();
       if (value_read.has_value()) {
         metadata_.subgroup_id = *value_read;
-        AdvanceParserState();
+        next_input_ = AdvanceParserState();
       }
       return;
     }
@@ -1345,7 +1426,7 @@
       std::optional<uint8_t> value_read = ReadUint8NoFin();
       if (value_read.has_value()) {
         metadata_.publisher_priority = *value_read;
-        AdvanceParserState();
+        next_input_ = AdvanceParserState();
       }
       return;
     }
@@ -1353,15 +1434,22 @@
     case kObjectId: {
       std::optional<uint64_t> value_read = ReadVarInt62NoFin();
       if (value_read.has_value()) {
-        if (type_.has_value() && type_->IsSubgroup() &&
-            last_object_id_.has_value()) {
-          metadata_.object_id = *value_read + *last_object_id_ + 1;
-        } else {
-          metadata_.object_id = *value_read;
+        if (type_.IsFetch() ||
+            !fetch_serialization_.end_of_non_existent_range() ||
+            !fetch_serialization_.end_of_unknown_range()) {
+          // Do not record range indicator object IDs because it will corrupt
+          // references to the previous object.
+          if (type_.IsSubgroup() && last_object_id_.has_value()) {
+            metadata_.object_id = *value_read + *last_object_id_ + 1;
+          } else {
+            metadata_.object_id = *value_read;
+          }
         }
         last_object_id_ = metadata_.object_id;
-        AdvanceParserState();
+        next_input_ = AdvanceParserState();
       }
+      // TODO(martinduke): Report something if the fetch serialization is an end
+      // of range indicator.
       return;
     }
 
@@ -1409,7 +1497,7 @@
         // stream was supposed to conclude with kEndOfGroup and end it with the
         // encoded status instead.
         visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
-        AdvanceParserState();
+        next_input_ = AdvanceParserState();
       }
       if (fin_read) {
         visitor_.OnFin();
@@ -1454,7 +1542,7 @@
               visitor_.OnFin();
             }
             ++num_objects_read_;
-            AdvanceParserState();
+            next_input_ = AdvanceParserState();
           }
           if (stream_.SkipBytes(chunk_size) && !no_more_data_) {
             // Although there was no FIN, SkipBytes() can return true if the
@@ -1474,7 +1562,7 @@
             return;
           }
           if (done) {
-            AdvanceParserState();
+            next_input_ = AdvanceParserState();
           }
         }
       }
@@ -1500,12 +1588,11 @@
 }
 
 void MoqtDataParser::ReadStreamType() {
-  return ReadDataUntil([this]() { return type_.has_value(); });
+  return ReadDataUntil([this]() { return next_input_ != kStreamType; });
 }
 
 void MoqtDataParser::ReadTrackAlias() {
-  return ReadDataUntil(
-      [this]() { return type_.has_value() && next_input_ != kTrackAlias; });
+  return ReadDataUntil([this]() { return next_input_ > kTrackAlias; });
 }
 
 void MoqtDataParser::ReadAtMostOneObject() {
@@ -1519,17 +1606,17 @@
     if (next_input_ == kAwaitingNextByte) {
       // Data arrived; the last object was not EndOfGroup.
       visitor_.OnObjectMessage(metadata_, "", /*end_of_message=*/true);
-      AdvanceParserState();
+      next_input_ = AdvanceParserState();
       ++num_objects_read_;
     }
     return false;
   }
   no_more_data_ = true;
-  const bool valid_state = type_.has_value() &&
-                           payload_length_remaining_ == 0 &&
-                           ((type_->IsSubgroup() && next_input_ == kObjectId) ||
-                            (type_->IsFetch() && next_input_ == kGroupId));
-  if (!valid_state || num_objects_read_ == 0) {
+  const bool valid_state =
+      payload_length_remaining_ == 0 &&
+      ((type_.IsSubgroup() && next_input_ == kObjectId) ||
+       (type_.IsFetch() && next_input_ == kSerializationFlags));
+  if (!valid_state) {
     ParseError("FIN received at an unexpected point in the stream");
     return true;
   }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index d0585ea..49248a7 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -218,11 +218,17 @@
   void ReadAtMostOneObject();
 
   // Returns the type of the unidirectional stream, if already known.
-  std::optional<MoqtDataStreamType> stream_type() const { return type_; }
+  std::optional<MoqtDataStreamType> stream_type() const {
+    if (next_input_ == kStreamType) {
+      return std::nullopt;
+    }
+    return type_;
+  }
 
   // Returns the track alias, if already known.
   std::optional<uint64_t> track_alias() const {
-    return (next_input_ == kStreamType || next_input_ == kTrackAlias)
+    return (next_input_ == kStreamType || next_input_ == kTrackAlias ||
+            next_input_ == kRequestId)
                ? std::optional<uint64_t>()
                : metadata_.track_alias;
   }
@@ -237,7 +243,9 @@
   // Current state of the parser.
   enum NextInput {
     kStreamType,
-    kTrackAlias,
+    kTrackAlias,          // SUBSCRIBE/PUBLISH only.
+    kRequestId,           // FETCH only.
+    kSerializationFlags,  // FETCH only.
     kGroupId,
     kSubgroupId,
     kPublisherPriority,
@@ -273,7 +281,7 @@
   std::optional<uint8_t> ReadUint8NoFin();
 
   // Advances the state machine of the parser to the next expected state.
-  void AdvanceParserState();
+  [[nodiscard]] NextInput AdvanceParserState();
   // Reads the next available item from the stream.
   void ParseNextItemFromStream();
   // Checks if we have encountered a FIN without data.  If so, processes it and
@@ -293,7 +301,8 @@
 
   std::string buffered_message_;
 
-  std::optional<MoqtDataStreamType> type_ = std::nullopt;
+  MoqtDataStreamType type_;
+  MoqtFetchSerialization fetch_serialization_;
   NextInput next_input_ = kStreamType;
   MoqtObject metadata_;
   std::optional<uint64_t> last_object_id_;
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index bf0aa9d..659006c 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -21,6 +21,7 @@
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
 #include "quiche/quic/platform/api/quic_test.h"
@@ -1512,4 +1513,101 @@
   EXPECT_TRUE(visitor_.fin_received_);
 }
 
+TEST_F(MoqtDataParserStateMachineTest, ReadTypeThenObjectsFetch) {
+  for (MoqtFetchSerialization serialization : AllMoqtFetchSerializations()) {
+    SCOPED_TRACE(testing::Message() << "flags: " << serialization.value());
+    MoqtParserTestVisitor visitor;
+    webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+    MoqtDataParser parser(&stream, &visitor);
+    StreamHeaderFetchMessage header;
+    StreamMiddlerFetchMessage middler(serialization);
+    stream.Receive(header.PacketSample());
+    stream.Receive(middler.PacketSample(), /*fin=*/true);
+    parser.ReadStreamType();
+    ASSERT_EQ(visitor.messages_received_, 0);
+    parser.ReadAtMostOneObject();
+    ASSERT_EQ(visitor.messages_received_, 1);
+    EXPECT_TRUE(header.EqualFieldValues(visitor.last_message_.value()));
+    EXPECT_EQ(visitor.object_payloads_[0], "foo");
+    parser.ReadAtMostOneObject();
+    ASSERT_EQ(visitor.messages_received_, 2);
+    EXPECT_TRUE(middler.EqualFieldValues(visitor.last_message_.value()));
+    EXPECT_EQ(visitor.object_payloads_[1], "bar");
+    EXPECT_EQ(visitor.parsing_error_, std::nullopt);
+    EXPECT_TRUE(visitor.fin_received_);
+  }
+}
+
+TEST_F(MoqtDataParserStateMachineTest, StreamHeaderFetchRefersToPrior) {
+  char data[] = {0x05, 0x01, 0x00};
+  // Iterate through the 5 serializations that refer to the prior object.
+  for (char value : {0x0f, 0x17, 0x1b, 0x1d, 0x1e}) {
+    data[2] = value;
+    MoqtParserTestVisitor visitor;
+    webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+    MoqtDataParser parser(&stream, &visitor);
+    stream.Receive(absl::string_view(data, sizeof(data)));
+    parser.ReadStreamType();
+    parser.ReadAtMostOneObject();
+    EXPECT_EQ(visitor.parsing_error_,
+              "Invalid serialization flags for first object");
+    EXPECT_EQ(visitor.parsing_error_code_, MoqtError::kProtocolViolation);
+  }
+}
+
+TEST_F(MoqtDataParserStateMachineTest, DatagramThenPriorSubgroupId) {
+  char data[] = {0x05, 0x01, 0x40, 0x5c, 0x05, 0x01,  // datagram (5, 1)
+                 0x80, 0x03, 0x61, 0x61, 0x61,        // priority, payload
+                 0xff};  // serialization flag to be overwritten
+  // Iterate through the 2 serializations that refer to the prior subgroup.
+  for (char value : {0x01, 0x02}) {
+    data[11] = value;
+    MoqtParserTestVisitor visitor;
+    webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+    MoqtDataParser parser(&stream, &visitor);
+    stream.Receive(absl::string_view(data, sizeof(data)));
+    parser.ReadStreamType();
+    parser.ReadAtMostOneObject();
+    parser.ReadAtMostOneObject();
+    EXPECT_EQ(visitor.parsing_error_,
+              "reference to subgroup ID of prior datagram");
+    EXPECT_EQ(visitor.parsing_error_code_, MoqtError::kProtocolViolation);
+  }
+}
+
+TEST_F(MoqtDataParserStateMachineTest, InvalidNonexistentRange) {
+  char data[] = {0x05, 0x01, 0x40, 0x80};
+  stream_.Receive(absl::string_view(data, sizeof(data)));
+  parser_.ReadStreamType();
+  parser_.ReadAtMostOneObject();
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid serialization flags");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtDataParserStateMachineTest, InvalidNonexistentRangeUnknownRange) {
+  char data[] = {0x05, 0x01, 0x41, 0x8c};
+  stream_.Receive(absl::string_view(data, sizeof(data)));
+  parser_.ReadStreamType();
+  parser_.ReadAtMostOneObject();
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid serialization flags");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtDataParserStateMachineTest, IgnoresEndRangeIndicators) {
+  // Header, Range Indicator, Middler
+  stream_.Receive(StreamHeaderFetchMessage().PacketSample());
+  char data[] = {0x40, 0x8c, 0x05, 0x07,   // non-existent range
+                 0x41, 0x0c, 0x05, 0x09};  // unknown range
+  stream_.Receive(absl::string_view(data, sizeof(data)));
+  std::optional<MoqtFetchSerialization> serialization =
+      MoqtFetchSerialization::FromValue(0x40);  // Datagram + explicit object ID
+  ASSERT_TRUE(serialization.has_value());
+  StreamMiddlerFetchMessage middler(*serialization);
+  stream_.Receive(middler.PacketSample(), /*fin=*/true);
+  parser_.ReadAllData();
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  // TODO(martinduke): Once Issue #1506 is resolved, check that the values
+  // are reported correctly.
+}
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 66f3d7a..25d75ea 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -14,10 +14,10 @@
 #include "quiche/quic/moqt/moqt_error.h"
 #include "quiche/quic/moqt/moqt_fetch_task.h"
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
-#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
@@ -38,10 +38,10 @@
 
   // Notifies that a new object is available on the track.  The object payload
   // itself may be retrieved via GetCachedObject method of the associated track
-  // publisher.
-  virtual void OnNewObjectAvailable(
-      Location sequence, uint64_t subgroup, MoqtPriority publisher_priority,
-      MoqtForwardingPreference forwarding_preference) = 0;
+  // publisher. If |subgroup| is nullopt, the object is a datagram.
+  virtual void OnNewObjectAvailable(Location sequence,
+                                    std::optional<uint64_t> subgroup,
+                                    MoqtPriority publisher_priority) = 0;
   // Notifies that a pure FIN has arrived following |sequence|. Should not be
   // called unless all objects have already been delivered. If not delivered,
   // instead set the fin_after_this flag in the PublishedObject.
@@ -82,13 +82,11 @@
   // whenever they are sent.  Once an object is not available via the cache, it
   // can no longer be sent; this ensures that objects are not buffered forever.
   //
-  // This method returns nullopt if the object is not currently available, but
-  // might become available in the future.  If the object is gone forever,
-  // kEndOfGroup/kObjectDoesNotExist has to be returned instead;
-  // otherwise, the corresponding QUIC streams will be stuck waiting for objects
-  // that will never arrive.
+  // This method returns nullopt if the object is not currently available.
+  // If |subgroup| is nullopt, the object is a datagram.
   virtual std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, uint64_t subgroup, uint64_t min_object) const = 0;
+      uint64_t group, std::optional<uint64_t> subgroup,
+      uint64_t min_object) const = 0;
 
   // Registers a listener with the track.  The listener will be notified of all
   // newly arriving objects. The pointer to the listener must be valid until
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.cc b/quiche/quic/moqt/moqt_relay_track_publisher.cc
index 1b44c8d..6d76d65 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.cc
@@ -20,6 +20,7 @@
 #include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/quiche_callbacks.h"
@@ -124,57 +125,72 @@
       return;
     }
   }
-  auto subgroup_it = group.subgroups.try_emplace(metadata.subgroup);
-  auto& subgroup = subgroup_it.first->second;
-  if (!subgroup.empty()) {  // Check if the new object is valid
-    CachedObject& last_object = subgroup.rbegin()->second;
-    if (last_object.metadata.publisher_priority !=
-        metadata.publisher_priority) {
-      QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup";
-      OnMalformedTrack(full_track_name);
-      return;
+  CachedObject* duplicate_object = nullptr;
+  if (!metadata.subgroup.has_value()) {  // It's a datagram.
+    std::shared_ptr<quiche::QuicheMemSlice> slice;
+    if (!object.empty()) {
+      slice = std::make_shared<quiche::QuicheMemSlice>(
+          quiche::QuicheMemSlice::Copy(object));
     }
-    if (last_object.fin_after_this) {
-      QUICHE_DLOG(INFO) << "Skipping object because it is after the end of the "
-                        << "subgroup";
-      OnMalformedTrack(full_track_name);
-      return;
+    auto [it, inserted] = group.datagrams.try_emplace(
+        metadata.location.object, CachedObject{metadata, slice, false});
+    if (!inserted) {
+      duplicate_object = &it->second;
     }
-    // If last_object has stream-ending status, it should have been caught by
-    // the fin_after_this check above.
-    QUICHE_DCHECK(
-        last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
-        last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
-    if (last_object.metadata.location.object > metadata.location.object) {
-      QUICHE_DLOG(INFO) << "Skipping object because it decreases the "
-                        << "object ID in the subgroup.";
-      return;
+  } else {
+    auto subgroup_it = group.subgroups.try_emplace(*metadata.subgroup);
+    auto& subgroup = subgroup_it.first->second;
+    if (!subgroup.empty()) {  // Check if the new object is valid
+      CachedObject& last_object = subgroup.rbegin()->second;
+      if (last_object.metadata.publisher_priority !=
+          metadata.publisher_priority) {
+        QUICHE_DLOG(INFO) << "Publisher priority changing in a subgroup";
+        OnMalformedTrack(full_track_name);
+        return;
+      }
+      if (last_object.fin_after_this) {
+        QUICHE_DLOG(INFO) << "Skipping object because it is after the end of "
+                          << "the subgroup";
+        OnMalformedTrack(full_track_name);
+        return;
+      }
+      // If last_object has stream-ending status, it should have been caught by
+      // the fin_after_this check above.
+      QUICHE_DCHECK(
+          last_object.metadata.status != MoqtObjectStatus::kEndOfGroup &&
+          last_object.metadata.status != MoqtObjectStatus::kEndOfTrack);
+      if (last_object.metadata.location.object > metadata.location.object) {
+        QUICHE_DLOG(INFO) << "Skipping object because it decreases the "
+                          << "object ID in the subgroup.";
+        return;
+      }
+    }
+    if (metadata.status == MoqtObjectStatus::kEndOfGroup ||
+        metadata.status == MoqtObjectStatus::kEndOfTrack) {
+      // Anticipate stream FIN.
+      last_object_in_stream = true;
+    }
+    std::shared_ptr<quiche::QuicheMemSlice> slice;
+    if (!object.empty()) {
+      slice = std::make_shared<quiche::QuicheMemSlice>(
+          quiche::QuicheMemSlice::Copy(object));
+    }
+    auto [it, inserted] = subgroup.try_emplace(
+        metadata.location.object,
+        CachedObject{metadata, slice, last_object_in_stream});
+    if (!inserted) {
+      duplicate_object = &it->second;
     }
   }
-  if (metadata.status == MoqtObjectStatus::kEndOfGroup ||
-      metadata.status == MoqtObjectStatus::kEndOfTrack) {
-    // Anticipate stream FIN.
-    last_object_in_stream = true;
-  }
-  std::shared_ptr<quiche::QuicheMemSlice> slice;
-  if (!object.empty()) {
-    slice = std::make_shared<quiche::QuicheMemSlice>(
-        quiche::QuicheMemSlice::Copy(object));
-  }
-  auto [it, inserted] = subgroup.try_emplace(
-      metadata.location.object,
-      CachedObject{metadata, slice, last_object_in_stream});
-  if (!inserted) {
-    // It's a duplicate object.
-    CachedObject& old_object = it->second;
-    if (metadata.IsMalformed(old_object.metadata)) {
+  if (duplicate_object != nullptr) {
+    if (metadata.IsMalformed(duplicate_object->metadata)) {
       // Something besides the arrival time and extension headers changed.
       OnMalformedTrack(full_track_name);
       return;
     }
     // TODO(b/467718801): Fix this when the class supports partial object
     // delivery. When objects are complete, we can simply compare payloads.
-    if (old_object.payload->AsStringView() != object) {
+    if (duplicate_object->payload->AsStringView() != object) {
       OnMalformedTrack(full_track_name);
     }
     // No need to update state.
@@ -199,10 +215,9 @@
   }
   for (MoqtObjectListener* listener : listeners_) {
     listener->OnNewObjectAvailable(metadata.location, metadata.subgroup,
-                                   metadata.publisher_priority,
-                                   metadata.forwarding_preference);
+                                   metadata.publisher_priority);
     if (last_object_in_stream) {
-      listener->OnNewFinAvailable(metadata.location, metadata.subgroup);
+      listener->OnNewFinAvailable(metadata.location, *(metadata.subgroup));
     }
   }
 }
@@ -270,14 +285,23 @@
 }
 
 std::optional<PublishedObject> MoqtRelayTrackPublisher::GetCachedObject(
-    uint64_t group_id, uint64_t subgroup_id, uint64_t min_object_id) const {
+    uint64_t group_id, std::optional<uint64_t> subgroup_id,
+    uint64_t min_object_id) const {
   auto group_it = queue_.find(group_id);
   if (group_it == queue_.end()) {
     // Group does not exist.
     return std::nullopt;
   }
   const Group& group = group_it->second;
-  auto subgroup_it = group.subgroups.find(subgroup_id);
+  if (!subgroup_id.has_value()) {
+    auto object_it = group.datagrams.lower_bound(min_object_id);
+    if (object_it == group.datagrams.end()) {
+      // No object after the last one received.
+      return std::nullopt;
+    }
+    return CachedObjectToPublishedObject(object_it->second);
+  }
+  auto subgroup_it = group.subgroups.find(*subgroup_id);
   if (subgroup_it == group.subgroups.end()) {
     // Subgroup does not exist.
     return std::nullopt;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher.h b/quiche/quic/moqt/moqt_relay_track_publisher.h
index 57738c0..e10e5ab 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher.h
+++ b/quiche/quic/moqt/moqt_relay_track_publisher.h
@@ -28,6 +28,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/quiche_callbacks.h"
 #include "quiche/common/quiche_weak_ptr.h"
 
@@ -85,7 +86,7 @@
   // MoqtTrackPublisher implementation.
   const FullTrackName& GetTrackName() const override { return track_; }
   std::optional<PublishedObject> GetCachedObject(
-      uint64_t group_id, uint64_t subgroup_id,
+      uint64_t group_id, std::optional<uint64_t> subgroup_id,
       uint64_t min_object) const override;
   void AddObjectListener(MoqtObjectListener* listener) override;
   void RemoveObjectListener(MoqtObjectListener* listener) override;
@@ -127,6 +128,7 @@
     uint64_t next_object = 0;
     bool complete = false;  // If true, kEndOfGroup has been received.
     absl::btree_map<uint64_t, Subgroup> subgroups;  // Ordered by subgroup id.
+    absl::btree_map<uint64_t, CachedObject> datagrams;
   };
 
   bool is_closing_ = false;
diff --git a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
index fc1cb5a..8861e7a 100644
--- a/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
+++ b/quiche/quic/moqt/moqt_relay_track_publisher_test.cc
@@ -18,6 +18,7 @@
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/test_tools/mock_moqt_session.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/web_transport/web_transport.h"
@@ -26,6 +27,8 @@
 
 namespace {
 
+using ::testing::Optional;
+
 const FullTrackName kTrackName = {"test", "track"};
 
 class MockMoqtObjectListener : public MoqtObjectListener {
@@ -34,9 +37,8 @@
   MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo reason),
               (override));
   MOCK_METHOD(void, OnNewObjectAvailable,
-              (Location sequence, uint64_t subgroup,
-               MoqtPriority publisher_priority,
-               MoqtForwardingPreference forwarding_preference),
+              (Location sequence, std::optional<uint64_t> subgroup,
+               MoqtPriority publisher_priority),
               (override));
   MOCK_METHOD(void, OnNewFinAvailable,
               (Location final_object_in_subgroup, uint64_t subgroup_id),
@@ -72,8 +74,7 @@
                      MoqtObjectStatus status, absl::string_view payload,
                      bool fin_after_this = false) {
     EXPECT_CALL(listener_,
-                OnNewObjectAvailable(location, subgroup, 128,
-                                     MoqtForwardingPreference::kSubgroup));
+                OnNewObjectAvailable(location, Optional(subgroup), 128));
     if (fin_after_this || status == MoqtObjectStatus::kEndOfTrack ||
         status == MoqtObjectStatus::kEndOfGroup) {
       EXPECT_CALL(listener_, OnNewFinAvailable(location, subgroup));
@@ -162,8 +163,7 @@
       EXPECT_CALL(listener_, OnGroupAbandoned(group - 3));
     }
     EXPECT_CALL(listener_,
-                OnNewObjectAvailable(Location(group, 0), 0, 128,
-                                     MoqtForwardingPreference::kSubgroup));
+                OnNewObjectAvailable(Location(group, 0), Optional(0), 128));
     publisher_.OnObjectFragment(
         kTrackName,
         PublishedObjectMetadata{Location(group, 0), 0, "",
@@ -355,14 +355,12 @@
   EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true));
   publisher_.AddObjectListener(&listener_);
   Location location = kLargestLocation.Next();
-  EXPECT_CALL(listener_,
-              OnNewObjectAvailable(location, /*subgroup=*/0,
-                                   /*publisher_priority=*/128,
-                                   MoqtForwardingPreference::kSubgroup));
+  EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0),
+                                              /*publisher_priority=*/128));
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, MoqtForwardingPreference::kSubgroup},
+                              128},
       "object", /*end_of_message=*/true);
   // Exact duplicate is ignored. It doesn't matter that the arrival time
   // changed.
@@ -372,8 +370,7 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, MoqtForwardingPreference::kSubgroup,
-                              quic::QuicTime::Infinite()},
+                              128, quic::QuicTime::Infinite()},
       "object", /*end_of_message=*/true);
 }
 
@@ -381,22 +378,20 @@
   EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true));
   publisher_.AddObjectListener(&listener_);
   Location location = kLargestLocation.Next();
-  EXPECT_CALL(listener_,
-              OnNewObjectAvailable(location, /*subgroup=*/0,
-                                   /*publisher_priority=*/128,
-                                   MoqtForwardingPreference::kSubgroup));
+  EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0),
+                                              /*publisher_priority=*/128));
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, MoqtForwardingPreference::kSubgroup},
+                              128},
       "object", /*end_of_message=*/true);
   // Priority change; malformed track.
   EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
   EXPECT_CALL(listener_, OnTrackPublisherGone);
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal, 64,
-                              MoqtForwardingPreference::kSubgroup},
+      PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
+                              64},
       "object", /*end_of_message=*/true);
   EXPECT_TRUE(track_deleted_);
 }
@@ -405,14 +400,12 @@
   EXPECT_CALL(*session_, Subscribe).WillOnce(testing::Return(true));
   publisher_.AddObjectListener(&listener_);
   Location location = kLargestLocation.Next();
-  EXPECT_CALL(listener_,
-              OnNewObjectAvailable(location, /*subgroup=*/0,
-                                   /*publisher_priority=*/128,
-                                   MoqtForwardingPreference::kSubgroup));
+  EXPECT_CALL(listener_, OnNewObjectAvailable(location, Optional(0),
+                                              /*publisher_priority=*/128));
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, MoqtForwardingPreference::kSubgroup},
+                              128},
       "payload", /*end_of_message=*/true);
   // Payload change; malformed track.
   EXPECT_CALL(listener_, OnNewObjectAvailable).Times(0);
@@ -420,7 +413,7 @@
   publisher_.OnObjectFragment(
       kTrackName,
       PublishedObjectMetadata{location, 0, "foo", MoqtObjectStatus::kNormal,
-                              128, MoqtForwardingPreference::kSubgroup},
+                              128},
       "foobar", /*end_of_message=*/true);
   EXPECT_TRUE(track_deleted_);
 }
@@ -462,19 +455,16 @@
   SubscribeAndOk();
   Location location = kLargestLocation.Next();
   EXPECT_CALL(listener_,
-              OnNewObjectAvailable(location, /*subgroup=*/location.object,
-                                   /*publisher_priority=*/128,
-                                   MoqtForwardingPreference::kDatagram));
+              OnNewObjectAvailable(location, testing::Eq(std::nullopt),
+                                   /*publisher_priority=*/128));
   publisher_.OnObjectFragment(
       kTrackName,
-      PublishedObjectMetadata{location, location.object, "",
-                              MoqtObjectStatus::kNormal, 128,
-                              MoqtForwardingPreference::kDatagram},
+      PublishedObjectMetadata{location, std::nullopt, "",
+                              MoqtObjectStatus::kNormal, 128},
       "object", /*end_of_message=*/true);
   std::optional<PublishedObject> object =
-      publisher_.GetCachedObject(location.group, location.object, 0);
-  EXPECT_TRUE(object.has_value() && object->metadata.forwarding_preference ==
-                                        MoqtForwardingPreference::kDatagram);
+      publisher_.GetCachedObject(location.group, std::nullopt, 0);
+  EXPECT_TRUE(object.has_value() && !object->metadata.subgroup.has_value());
 }
 
 }  // namespace
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 09354b2..aeb4e1f 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -45,6 +45,7 @@
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/common/platform/api/quiche_bug_tracker.h"
 #include "quiche/common/platform/api/quiche_logging.h"
@@ -218,10 +219,9 @@
     // TODO(martinduke): Handle extension headers.
     PublishedObjectMetadata metadata;
     metadata.location = Location(message.group_id, message.object_id);
-    metadata.subgroup = message.object_id;
+    metadata.subgroup = std::nullopt;
     metadata.status = message.object_status;
     metadata.publisher_priority = message.publisher_priority;
-    metadata.forwarding_preference = MoqtForwardingPreference::kDatagram;
     metadata.arrival_time = callbacks_.clock->Now();
     visitor->OnObjectFragment(track->full_track_name(), metadata, *payload,
                               true);
@@ -660,19 +660,17 @@
     switch (result) {
       case MoqtFetchTask::GetNextObjectResult::kSuccess:
         // Skip ObjectDoesNotExist in FETCH.
-        if (object.metadata.status == MoqtObjectStatus::kObjectDoesNotExist) {
+        if (object.metadata.status != MoqtObjectStatus::kNormal) {
           QUIC_BUG(quic_bug_got_doesnotexist_in_fetch)
-              << "Got ObjectDoesNotExist in FETCH";
+              << "Got Non-normal object in FETCH";
           continue;
         }
         if (fetch->session_->WriteObjectToStream(
                 stream_, fetch->request_id(), object.metadata,
                 std::move(object.payload), MoqtDataStreamType::Fetch(),
                 // last Object ID doesn't matter for FETCH, just use zero.
-                stream_header_written_ ? std::optional<uint64_t>(0)
-                                       : std::nullopt,
-                /*fin=*/false)) {
-          stream_header_written_ = true;
+                last_object_, /*fin=*/false)) {
+          last_object_ = object.metadata;
         }
         break;
       case MoqtFetchTask::GetNextObjectResult::kPending:
@@ -1617,9 +1615,6 @@
                   << " priority " << message.publisher_priority << " length "
                   << payload.size() << " length " << message.payload_length
                   << (end_of_message ? "F" : "");
-  if (!index_.has_value()) {
-    index_ = DataStreamIndex(message.group_id, message.subgroup_id);
-  }
   if (!session_->parameters_.deliver_partial_objects) {
     if (!end_of_message) {  // Buffer partial object.
       if (partial_object_.empty()) {
@@ -1665,6 +1660,14 @@
     return;
   }
   if (!track->is_fetch()) {
+    if (!index_.has_value()) {
+      if (!message.subgroup_id.has_value()) {
+        QUICHE_BUG(quiche_bug_moqt_subgroup_id_missing)
+            << "Missing subgroup ID on SUBSCRIBE stream";
+        return;
+      }
+      index_ = DataStreamIndex(message.group_id, *message.subgroup_id);
+    }
     if (no_more_objects_) {
       // Already got a stream-ending object. While the lower layer won't
       // deliver data after the FIN, there could have been an EndOfGroup or
@@ -1689,7 +1692,6 @@
       metadata.extensions = message.extension_headers;
       metadata.status = message.object_status;
       metadata.publisher_priority = message.publisher_priority;
-      metadata.forwarding_preference = MoqtForwardingPreference::kSubgroup;
       metadata.arrival_time = session_->callbacks_.clock->Now();
       subscribe->visitor()->OnObjectFragment(track->full_track_name(), metadata,
                                              payload, end_of_message);
@@ -1965,8 +1967,8 @@
 }
 
 void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
-    Location location, uint64_t subgroup, MoqtPriority publisher_priority,
-    MoqtForwardingPreference forwarding_preference) {
+    Location location, std::optional<uint64_t> subgroup,
+    MoqtPriority publisher_priority) {
   if (!InWindow(location)) {
     return;
   }
@@ -1987,13 +1989,20 @@
     }
   }
 
+  // TODO(vasilvv): This currently sends UINT64_MAX for datagram subgroups.
+  // Maybe do something more satisfactory?
   session_->trace_recorder_.RecordNewObjectAvaliable(
-      track_alias_, *track_publisher_, location, subgroup, publisher_priority);
+      track_alias_, *track_publisher_, location, subgroup.value_or(UINT64_MAX),
+      publisher_priority);
 
-  DataStreamIndex index(location.group, subgroup);
-  if (reset_subgroups_.contains(index)) {
-    // This subgroup has already been reset, ignore.
-    return;
+  std::optional<webtransport::StreamId> stream_id;
+  if (subgroup.has_value()) {
+    DataStreamIndex index(location.group, *subgroup);
+    if (reset_subgroups_.contains(index)) {
+      // This subgroup has already been reset, ignore.
+      return;
+    }
+    stream_id = stream_map().GetStreamFor(index);
   }
   if (session_->alternate_delivery_timeout_ &&
       !delivery_timeout().IsInfinite() && largest_sent_.has_value() &&
@@ -2001,10 +2010,10 @@
     // Start the delivery timeout timer on all previous groups.
     for (uint64_t group = first_active_group_; group < location.group;
          ++group) {
-      for (webtransport::StreamId stream_id :
+      for (webtransport::StreamId stream_to_update :
            stream_map().GetStreamsForGroup(group)) {
         webtransport::Stream* raw_stream =
-            session_->session_->GetStreamById(stream_id);
+            session_->session_->GetStreamById(stream_to_update);
         if (raw_stream == nullptr) {
           continue;
         }
@@ -2016,21 +2025,18 @@
     }
   }
   QUICHE_DCHECK_GE(location.group, first_active_group_);
-
-  if (forwarding_preference == MoqtForwardingPreference::kDatagram) {
+  if (!subgroup.has_value()) {
     SendDatagram(location);
     return;
   }
 
-  std::optional<webtransport::StreamId> stream_id =
-      stream_map().GetStreamFor(index);
   webtransport::Stream* raw_stream = nullptr;
   if (stream_id.has_value()) {
     raw_stream = session_->session_->GetStreamById(*stream_id);
   } else {
     raw_stream = session_->OpenOrQueueDataStream(
         request_id_,
-        NewStreamParameters(location.group, subgroup, location.object,
+        NewStreamParameters(location.group, *subgroup, location.object,
                             (publisher_priority == default_publisher_priority_)
                                 ? std::nullopt
                                 : std::make_optional(publisher_priority)));
@@ -2341,16 +2347,17 @@
       stream_->ResetWithUserCode(kResetCodeDeliveryTimeout);
       return;
     }
-    if (!session_->WriteObjectToStream(
-            stream_, subscription.track_alias(), object->metadata,
-            std::move(object->payload), stream_type_, last_object_id_,
-            object->fin_after_this)) {
+    if (!session_->WriteObjectToStream(stream_, subscription.track_alias(),
+                                       object->metadata,
+                                       std::move(object->payload), stream_type_,
+                                       last_object_, object->fin_after_this)) {
       // WriteObjectToStream() closes the connection on error, meaning that
       // there is no need to process the stream any further.
       return;
     }
-    last_object_id_ = object->metadata.location.object;
-    next_object_ = *last_object_id_ + 1;
+    last_object_ = object->metadata;
+
+    next_object_ = last_object_->location.object + 1;
     subscription.OnObjectSent(object->metadata.location);
 
     if (object->fin_after_this && !delivery_timeout.IsInfinite() &&
@@ -2381,12 +2388,11 @@
   }
 }
 
-bool MoqtSession::WriteObjectToStream(webtransport::Stream* stream, uint64_t id,
-                                      const PublishedObjectMetadata& metadata,
-                                      quiche::QuicheMemSlice payload,
-                                      MoqtDataStreamType type,
-                                      std::optional<uint64_t> last_id,
-                                      bool fin) {
+bool MoqtSession::WriteObjectToStream(
+    webtransport::Stream* stream, uint64_t id,
+    const PublishedObjectMetadata& metadata, quiche::QuicheMemSlice payload,
+    MoqtDataStreamType type, std::optional<PublishedObjectMetadata> last_object,
+    bool fin) {
   QUICHE_DCHECK(stream->CanWrite());
   MoqtObject header;
   header.track_alias = id;
@@ -2399,7 +2405,7 @@
   header.payload_length = payload.length();
 
   quiche::QuicheBuffer serialized_header =
-      framer_.SerializeObjectHeader(header, type, last_id);
+      framer_.SerializeObjectHeader(header, type, last_object);
   // TODO(vasilvv): add a version of WebTransport write API that accepts
   // memslices so that we can avoid a copy here.
   std::array write_vector = {
@@ -2490,7 +2496,7 @@
 
 void MoqtSession::PublishedSubscription::SendDatagram(Location sequence) {
   std::optional<PublishedObject> object = track_publisher_->GetCachedObject(
-      sequence.group, sequence.object, sequence.object);
+      sequence.group, std::nullopt, sequence.object);
   if (!object.has_value()) {
     QUICHE_BUG(PublishedSubscription_SendDatagram_object_not_in_cache)
         << "Got notification about an object that is not in the cache";
@@ -2503,7 +2509,7 @@
   header.publisher_priority = object->metadata.publisher_priority;
   header.extension_headers = object->metadata.extensions;
   header.object_status = object->metadata.status;
-  header.subgroup_id = header.object_id;
+  header.subgroup_id = std::nullopt;
   header.payload_length = object->payload.length();
   quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
       header, object->payload.AsStringView(),
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index d7289dd..8bb9060 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -30,6 +30,7 @@
 #include "quiche/quic/moqt/moqt_key_value_pair.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
+#include "quiche/quic/moqt/moqt_object.h"
 #include "quiche/quic/moqt/moqt_parser.h"
 #include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/moqt_publisher.h"
@@ -38,6 +39,7 @@
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 #include "quiche/quic/moqt/moqt_trace_recorder.h"
 #include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/session_namespace_tree.h"
 #include "quiche/common/platform/api/quiche_export.h"
 #include "quiche/common/platform/api/quiche_logging.h"
@@ -384,9 +386,9 @@
     void OnSubscribeAccepted() override;
     void OnSubscribeRejected(MoqtRequestErrorInfo info) override;
     // This is only called for objects that have just arrived.
-    void OnNewObjectAvailable(
-        Location location, uint64_t subgroup, MoqtPriority publisher_priority,
-        MoqtForwardingPreference forwarding_preference) override;
+    void OnNewObjectAvailable(Location location,
+                              std::optional<uint64_t> subgroup,
+                              MoqtPriority publisher_priority) override;
     void OnTrackPublisherGone() override;
     void OnNewFinAvailable(Location location, uint64_t subgroup) override;
     void OnSubgroupAbandoned(uint64_t group, uint64_t subgroup,
@@ -576,7 +578,7 @@
     uint64_t next_object_;
     // Used in subgroup streams to compute the object ID diff. If nullopt, the
     // stream header has not been written yet.
-    std::optional<uint64_t> last_object_id_;
+    std::optional<PublishedObjectMetadata> last_object_;
     // If this data stream is for SUBSCRIBE, reset it if an object has been
     // excessively delayed per Section 7.1.1.2.
     std::unique_ptr<quic::QuicAlarm> delivery_timeout_alarm_;
@@ -615,8 +617,8 @@
 
      private:
       std::weak_ptr<PublishedFetch> fetch_;
+      std::optional<PublishedObjectMetadata> last_object_;
       webtransport::Stream* stream_;
-      bool stream_header_written_ = false;
     };
 
     MoqtFetchTask* fetch_task() { return fetch_.get(); }
@@ -667,8 +669,8 @@
       // No class access below this line!
     }
 
-    void OnNewObjectAvailable(Location, uint64_t /*subgroup*/, MoqtPriority,
-                              MoqtForwardingPreference) override {}
+    void OnNewObjectAvailable(Location, std::optional<uint64_t> /*subgroup*/,
+                              MoqtPriority) override {}
     void OnNewFinAvailable(Location /*location*/,
                            uint64_t /*subgroup*/) override {}
     void OnSubgroupAbandoned(
@@ -749,7 +751,8 @@
                            const PublishedObjectMetadata& metadata,
                            quiche::QuicheMemSlice payload,
                            MoqtDataStreamType type,
-                           std::optional<uint64_t> last_id, bool fin);
+                           std::optional<PublishedObjectMetadata> last_object,
+                           bool fin);
 
   void CancelFetch(uint64_t request_id);
 
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index cb65ccd..22b8d38 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -9,6 +9,7 @@
 #include <cstring>
 #include <memory>
 #include <optional>
+#include <queue>
 #include <string>
 #include <utility>
 #include <variant>
@@ -37,6 +38,7 @@
 #include "quiche/quic/moqt/moqt_session_callbacks.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
 #include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/moqt/session_namespace_tree.h"
 #include "quiche/quic/moqt/test_tools/moqt_framer_utils.h"
 #include "quiche/quic/moqt/test_tools/moqt_mock_visitor.h"
@@ -59,6 +61,7 @@
 
 using ::quic::test::MemSliceFromString;
 using ::testing::_;
+using ::testing::Optional;
 using ::testing::Return;
 using ::testing::StrictMock;
 
@@ -206,12 +209,16 @@
                      std::unique_ptr<webtransport::StreamVisitor>& visitor,
                      MockSubscribeRemoteTrackVisitor* track_visitor) {
     MoqtFramer framer(true);
+    std::optional<PublishedObjectMetadata> previous_object;
+    if (visitor != nullptr) {
+      previous_object = PublishedObjectMetadata();
+      previous_object->location.object = object.object_id - 1;
+    }
     quiche::QuicheBuffer buffer = framer.SerializeObjectHeader(
         object,
-        MoqtDataStreamType::Subgroup(object.subgroup_id, object.object_id,
+        MoqtDataStreamType::Subgroup(*object.subgroup_id, object.object_id,
                                      false, false),
-        (visitor == nullptr) ? std::nullopt
-                             : std::optional<uint64_t>(object.object_id - 1));
+        previous_object);
     size_t data_read = 0;
     if (visitor == nullptr) {  // It's the first object in the stream
       EXPECT_CALL(session, AcceptIncomingUnidirectionalStream())
@@ -598,8 +605,7 @@
   // forward=false, so incoming objects are ignored.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .Times(0);
-  listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority,
-                                 MoqtForwardingPreference::kSubgroup);
+  listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority);
 }
 
 TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) {
@@ -613,8 +619,7 @@
   // Window was not set to (0, 0) by SUBSCRIBE acceptance.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .Times(0);
-  listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority,
-                                 MoqtForwardingPreference::kSubgroup);
+  listener->OnNewObjectAvailable(Location(0, 0), 0, kDefaultPublisherPriority);
 }
 
 TEST_F(MoqtSessionTest, SubscribeNextGroup) {
@@ -630,13 +635,12 @@
   // Later objects in group 10 ignored.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .Times(0);
-  listener->OnNewObjectAvailable(Location(10, 21), 0, kDefaultPublisherPriority,
-                                 MoqtForwardingPreference::kSubgroup);
+  listener->OnNewObjectAvailable(Location(10, 21), 0,
+                                 kDefaultPublisherPriority);
   // Group 11 is sent.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(false));
-  listener->OnNewObjectAvailable(Location(11, 0), 0, kDefaultPublisherPriority,
-                                 MoqtForwardingPreference::kSubgroup);
+  listener->OnNewObjectAvailable(Location(11, 0), 0, kDefaultPublisherPriority);
 }
 
 TEST_F(MoqtSessionTest, TwoSubscribesForTrack) {
@@ -1159,8 +1163,6 @@
         EXPECT_EQ(metadata.extensions, "foo");
         EXPECT_EQ(metadata.status, MoqtObjectStatus::kNormal);
         EXPECT_EQ(metadata.publisher_priority, 0);
-        EXPECT_EQ(metadata.forwarding_preference,
-                  MoqtForwardingPreference::kSubgroup);
         EXPECT_EQ(payload, received_payload);
         EXPECT_TRUE(end_of_message);
       });
@@ -1325,20 +1327,17 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{
         PublishedObjectMetadata{Location(5, 0), 0, "extensions",
                                 MoqtObjectStatus::kNormal, 127,
-                                MoqtForwardingPreference::kSubgroup,
                                 MoqtSessionPeer::Now(&session_)},
         MemSliceFromString("deadbeef"), false};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
-  subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+  subscription->OnNewObjectAvailable(Location(5, 0), 0, 127);
   EXPECT_TRUE(correct_message);
   EXPECT_FALSE(fin);
   EXPECT_EQ(MoqtSessionPeer::LargestSentForSubscription(&session_, 0),
@@ -1382,19 +1381,17 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), true};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(fin);
 }
@@ -1436,19 +1433,17 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), true};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(fin);
 
@@ -1504,19 +1499,17 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), true};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(fin);
 
@@ -1574,19 +1567,17 @@
         fin |= options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), true};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(fin);
   EXPECT_CALL(mock_stream_, ResetWithUserCode(kResetCodeDeliveryTimeout));
@@ -1630,19 +1621,17 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), false};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_TRUE(correct_message);
   EXPECT_FALSE(fin);
   fin = false;
@@ -1693,27 +1682,24 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), false};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   EXPECT_FALSE(fin);
   // Try to deliver (5,1), but fail.
   EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return false; });
   EXPECT_CALL(*track, GetCachedObject).Times(0);
   EXPECT_CALL(mock_stream_, Writev).Times(0);
   subscription->OnNewObjectAvailable(Location(5, 1), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   // Notify that FIN arrived, but do nothing with it because (5, 1) isn't sent.
   EXPECT_CALL(mock_stream_, Writev).Times(0);
   subscription->OnNewFinAvailable(Location(5, 1), 0);
@@ -1723,15 +1709,14 @@
   // object id, extensions, payload length, status.
   const std::string kExpectedMessage2 = {0x00, 0x00, 0x00, 0x03};
   EXPECT_CALL(mock_stream_, CanWrite()).WillRepeatedly([&] { return true; });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([&] {
     return PublishedObject{
         PublishedObjectMetadata{Location(5, 1), 0, "",
                                 MoqtObjectStatus::kEndOfGroup, 127,
-                                MoqtForwardingPreference::kSubgroup,
                                 MoqtSessionPeer::Now(&session_)},
         MemSliceFromString(""), true};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 2)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 2)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   EXPECT_CALL(mock_stream_, Writev(_, _))
@@ -1784,19 +1769,17 @@
         fin = options.send_fin();
         return absl::OkStatus();
       });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([&] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([&] {
     return PublishedObject{PublishedObjectMetadata{
                                Location(5, 0), 0, "", MoqtObjectStatus::kNormal,
-                               127, MoqtForwardingPreference::kSubgroup,
-                               MoqtSessionPeer::Now(&session_)},
+                               127, MoqtSessionPeer::Now(&session_)},
                            MemSliceFromString("deadbeef"), false};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   // Abandon the subgroup.
   EXPECT_CALL(mock_stream_, ResetWithUserCode(0x1)).Times(1);
@@ -1816,8 +1799,7 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(false));
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   // Unblock the session, and cause the queued stream to be sent.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -1839,13 +1821,13 @@
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] {
     return PublishedObject{
         PublishedObjectMetadata{Location(5, 0), 0, "",
                                 MoqtObjectStatus::kNormal, 128},
         MemSliceFromString("deadbeef")};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1862,11 +1844,9 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillRepeatedly(Return(false));
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   subscription->OnNewObjectAvailable(Location(6, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   subscription->OnGroupAbandoned(5);
 
   // Unblock the session, and cause the queued stream to be sent. There should
@@ -1891,13 +1871,13 @@
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(&mock_stream_));
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(6, 0, 0)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 0)).WillRepeatedly([] {
     return PublishedObject{
         PublishedObjectMetadata{Location(6, 0), 0, "",
                                 MoqtObjectStatus::kNormal, 128},
         MemSliceFromString("deadbeef")};
   });
-  EXPECT_CALL(*track, GetCachedObject(6, 0, 1)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(6, Optional(0), 1)).WillRepeatedly([] {
     return std::optional<PublishedObject>();
   });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1930,26 +1910,24 @@
       .WillRepeatedly(Return(&mock_stream_));
 
   EXPECT_CALL(mock_stream_, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 0)).WillRepeatedly([] {
     return PublishedObject{
         PublishedObjectMetadata{Location(5, 0), 0, "",
                                 MoqtObjectStatus::kNormal, 128},
         MemSliceFromString("deadbeef")};
   });
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).WillOnce([] {
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).WillOnce([] {
     return std::optional<PublishedObject>();
   });
   subscription->OnNewObjectAvailable(Location(5, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   // Now that the stream exists and is recorded within subscription, make it
   // disappear by returning nullptr.
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillRepeatedly(Return(nullptr));
-  EXPECT_CALL(*track, GetCachedObject(5, 0, 1)).Times(0);
+  EXPECT_CALL(*track, GetCachedObject(5, Optional(0), 1)).Times(0);
   subscription->OnNewObjectAvailable(Location(5, 1), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 }
 
 TEST_F(MoqtSessionTest, ReceiveUnsubscribe) {
@@ -1979,7 +1957,7 @@
       0x05, 0x02, 0x05, 0x20, 0x03, 0x65, 0x78, 0x74,
       0x64, 0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,  // "deadbeef"
   };
-  EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
+  EXPECT_CALL(mock_session_, SendOrQueueDatagram)
       .WillOnce([&](absl::string_view datagram) {
         if (datagram.size() == sizeof(kExpectedMessage)) {
           correct_message = (0 == memcmp(datagram.data(), kExpectedMessage,
@@ -1988,14 +1966,15 @@
         return webtransport::DatagramStatus(
             webtransport::DatagramStatusCode::kSuccess, "");
       });
-  EXPECT_CALL(*track_publisher, GetCachedObject(5, 0, 0)).WillRepeatedly([] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location{5, 0}, 0, "ext",
-                                MoqtObjectStatus::kNormal, 32},
-        quiche::QuicheMemSlice::Copy("deadbeef")};
-  });
-  listener->OnNewObjectAvailable(Location(5, 0), 0, kDefaultPublisherPriority,
-                                 MoqtForwardingPreference::kDatagram);
+  EXPECT_CALL(*track_publisher,
+              GetCachedObject(5, std::optional<uint64_t>(), 0))
+      .WillRepeatedly([] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location{5, 0}, std::nullopt, "ext",
+                                    MoqtObjectStatus::kNormal, 32},
+            quiche::QuicheMemSlice::Copy("deadbeef")};
+      });
+  listener->OnNewObjectAvailable(Location(5, 0), std::nullopt, 32);
   EXPECT_TRUE(correct_message);
 }
 
@@ -2011,7 +1990,7 @@
       /*publisher_priority=*/0,
       /*extension_headers=*/"",
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*subgroup_id=*/0,
+      /*subgroup_id=*/std::nullopt,
       /*payload_length=*/8,
   };
   char datagram[] = {0x00, 0x02, 0x00, 0x00, 0x00, 0x64, 0x65,
@@ -2023,10 +2002,9 @@
         EXPECT_EQ(track_name, ftn);
         EXPECT_EQ(metadata.location,
                   Location(object.group_id, object.object_id));
+        EXPECT_EQ(metadata.subgroup, object.subgroup_id);
         EXPECT_EQ(metadata.publisher_priority, object.publisher_priority);
         EXPECT_EQ(metadata.status, object.object_status);
-        EXPECT_EQ(metadata.forwarding_preference,
-                  MoqtForwardingPreference::kDatagram);
         EXPECT_EQ(payload, received_payload);
         EXPECT_TRUE(fin);
       });
@@ -2122,13 +2100,12 @@
                     MoqtDataStreamType::kDefaultPriority);
         return absl::OkStatus();
       });
-  listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority,
-                                 MoqtForwardingPreference::kSubgroup);
+  listener->OnNewObjectAvailable(Location(0, 0), 0, kLocalDefaultPriority);
   // Send a datagram with the default priority.
   EXPECT_CALL(*track, GetCachedObject)
       .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{Location(0, 1), 0, "",
-                                                  MoqtObjectStatus::kNormal,
+          PublishedObject{PublishedObjectMetadata{Location(0, 1), std::nullopt,
+                                                  "", MoqtObjectStatus::kNormal,
                                                   kLocalDefaultPriority},
                           MemSliceFromString("deadbeef")}));
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
@@ -2138,13 +2115,13 @@
         return webtransport::DatagramStatus{
             webtransport::DatagramStatusCode::kSuccess, ""};
       });
-  listener->OnNewObjectAvailable(Location(0, 1), 0, kLocalDefaultPriority,
-                                 MoqtForwardingPreference::kDatagram);
+  listener->OnNewObjectAvailable(Location(0, 1), std::nullopt,
+                                 kLocalDefaultPriority);
   // Non-default priority
   EXPECT_CALL(*track, GetCachedObject)
       .WillOnce(Return(
-          PublishedObject{PublishedObjectMetadata{Location(0, 2), 0, "",
-                                                  MoqtObjectStatus::kNormal,
+          PublishedObject{PublishedObjectMetadata{Location(0, 2), std::nullopt,
+                                                  "", MoqtObjectStatus::kNormal,
                                                   kLocalDefaultPriority + 1},
                           MemSliceFromString("deadbeef")}));
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
@@ -2154,8 +2131,8 @@
         return webtransport::DatagramStatus{
             webtransport::DatagramStatusCode::kSuccess, ""};
       });
-  listener->OnNewObjectAvailable(Location(0, 2), 0, kLocalDefaultPriority + 1,
-                                 MoqtForwardingPreference::kDatagram);
+  listener->OnNewObjectAvailable(Location(0, 2), std::nullopt,
+                                 kLocalDefaultPriority + 1);
 }
 
 TEST_F(MoqtSessionTest, StreamObjectOutOfWindow) {
@@ -2204,14 +2181,11 @@
       .WillOnce(Return(false))
       .WillOnce(Return(false));
   subscription->OnNewObjectAvailable(Location(1, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   subscription->OnNewObjectAvailable(Location(2, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   // These should be opened in the sequence (0, 0), (1, 0), (2, 0).
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillRepeatedly(Return(true));
@@ -2245,24 +2219,27 @@
   EXPECT_CALL(mock_stream2, visitor()).WillOnce([&]() {
     return stream_visitor[2].get();
   });
-  EXPECT_CALL(*track, GetCachedObject(0, 0, 0))
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kNormal, 127},
           MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(1, 0, 0))
+  EXPECT_CALL(*track, GetCachedObject(0, Optional(0), 1))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(1, 0), 0, "",
                                   MoqtObjectStatus::kNormal, 127},
           MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(1, 0, 1)).WillOnce(Return(std::nullopt));
-  EXPECT_CALL(*track, GetCachedObject(2, 0, 0))
+  EXPECT_CALL(*track, GetCachedObject(1, Optional(0), 1))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(2, 0), 0, "",
                                   MoqtObjectStatus::kNormal, 127},
           MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track, GetCachedObject(2, 0, 1)).WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track, GetCachedObject(2, Optional(0), 1))
+      .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream2, CanWrite()).WillRepeatedly(Return(true));
@@ -2299,8 +2276,7 @@
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(false));
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   // Delete the subscription, then grant stream credit.
   MoqtSessionPeer::DeleteSubscription(&session_, 0);
@@ -2331,17 +2307,13 @@
       .WillOnce(Return(false))
       .WillOnce(Return(false));
   subscription0->OnNewObjectAvailable(Location(0, 0), 0,
-                                      kDefaultPublisherPriority,
-                                      MoqtForwardingPreference::kSubgroup);
+                                      kDefaultPublisherPriority);
   subscription1->OnNewObjectAvailable(Location(0, 0), 0,
-                                      kDefaultPublisherPriority,
-                                      MoqtForwardingPreference::kSubgroup);
+                                      kDefaultPublisherPriority);
   subscription0->OnNewObjectAvailable(Location(1, 0), 0,
-                                      kDefaultPublisherPriority,
-                                      MoqtForwardingPreference::kSubgroup);
+                                      kDefaultPublisherPriority);
   subscription1->OnNewObjectAvailable(Location(1, 0), 0,
-                                      kDefaultPublisherPriority,
-                                      MoqtForwardingPreference::kSubgroup);
+                                      kDefaultPublisherPriority);
 
   // Allow one stream to be opened. It will be group 0, subscription 0.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
@@ -2359,12 +2331,13 @@
   EXPECT_CALL(mock_stream0, visitor()).WillOnce([&]() {
     return stream_visitor0.get();
   });
-  EXPECT_CALL(*track1, GetCachedObject(0, 0, 0))
+  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kNormal, 127},
           MemSliceFromString("foobar")}));
-  EXPECT_CALL(*track1, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track1, GetCachedObject(0, Optional(0), 1))
+      .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream0, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream0, Writev)
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
@@ -2395,12 +2368,13 @@
   EXPECT_CALL(mock_stream1, visitor()).WillOnce([&]() {
     return stream_visitor1.get();
   });
-  EXPECT_CALL(*track2, GetCachedObject(0, 0, 0))
+  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 0))
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kNormal, 127},
           MemSliceFromString("deadbeef")}));
-  EXPECT_CALL(*track2, GetCachedObject(0, 0, 1)).WillOnce(Return(std::nullopt));
+  EXPECT_CALL(*track2, GetCachedObject(0, Optional(0), 1))
+      .WillOnce(Return(std::nullopt));
   EXPECT_CALL(mock_stream1, CanWrite()).WillRepeatedly(Return(true));
   EXPECT_CALL(mock_stream1, Writev(_, _))
       .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data,
@@ -2812,9 +2786,9 @@
       /*payload_length=*/3,
   };
   MoqtFramer framer(true);
+  std::optional<PublishedObjectMetadata> metadata;
   quiche::QuicheBuffer header = framer.SerializeObjectHeader(
-      object, MoqtDataStreamType::Fetch(), std::nullopt);
-
+      object, MoqtDataStreamType::Fetch(), metadata);
   // Open stream, deliver two objects before FETCH_OK. Neither should be read.
   webtransport::test::InMemoryStream data_stream(kIncomingUniStreamId);
   data_stream.SetVisitor(
@@ -3038,11 +3012,13 @@
       /*payload_length=*/3,
   };
   MoqtFramer framer_(true);
+  std::optional<PublishedObjectMetadata> metadata;
   for (int i = 0; i < 4; ++i) {
     object.object_id = i;
     headers.push(framer_.SerializeObjectHeader(
-        object, MoqtDataStreamType::Fetch(),
-        i == 0 ? std::nullopt : std::optional<uint64_t>(i - 1)));
+        object, MoqtDataStreamType::Fetch(), metadata));
+    metadata = PublishedObjectMetadata();
+    metadata->location.object = i;  // only object ID matters.
     payloads.push("foo");
   }
 
@@ -3109,11 +3085,13 @@
       /*payload_length=*/3,
   };
   MoqtFramer framer_(true);
+  std::optional<PublishedObjectMetadata> metadata;
   for (int i = 0; i < 4; ++i) {
     object.object_id = i;
     headers.push(framer_.SerializeObjectHeader(
-        object, MoqtDataStreamType::Fetch(),
-        i == 0 ? std::nullopt : std::optional<uint64_t>(i - 1)));
+        object, MoqtDataStreamType::Fetch(), metadata));
+    metadata = PublishedObjectMetadata();
+    metadata->location.object = i;  // only object ID matters.
     payloads.push("foo");
   }
 
@@ -3200,8 +3178,9 @@
       /*payload_length=*/6,
   };
   MoqtFramer framer_(true);
+  std::optional<PublishedObjectMetadata> metadata;
   quiche::QuicheBuffer header = framer_.SerializeObjectHeader(
-      object, MoqtDataStreamType::Fetch(), std::nullopt);
+      object, MoqtDataStreamType::Fetch(), metadata);
   stream.Receive(header.AsStringView(), false);
   EXPECT_FALSE(task->HasObject());
   EXPECT_FALSE(object_ready);
@@ -3264,7 +3243,6 @@
                                      "",
                                      MoqtObjectStatus::kObjectDoesNotExist,
                                      0,
-                                     MoqtForwardingPreference::kSubgroup,
                                      MoqtSessionPeer::Now(&session_) -
                                          quic::QuicTimeDelta::FromSeconds(1),
                                  },
@@ -3277,8 +3255,7 @@
   ON_CALL(*track_publisher, largest_location)
       .WillByDefault(Return(Location(0, 0)));
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   // Subsequent objects for that subgroup are ignored.
   EXPECT_CALL(*track_publisher, GetCachedObject).Times(0);
   EXPECT_CALL(mock_session_, GetStreamById(_)).Times(0);
@@ -3287,8 +3264,7 @@
   ON_CALL(*track_publisher, largest_location)
       .WillByDefault(Return(Location(0, 1)));
   subscription->OnNewObjectAvailable(Location(0, 1), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   // Check that reset_subgroups_ is pruned.
   EXPECT_TRUE(MoqtSessionPeer::SubgroupHasBeenReset(subscription,
                                                     DataStreamIndex(0, 0)));
@@ -3326,7 +3302,6 @@
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kObjectDoesNotExist, 0,
-                                  MoqtForwardingPreference::kSubgroup,
                                   MoqtSessionPeer::Now(&session_)},
           quiche::QuicheMemSlice(), true}))
       .WillOnce(Return(std::nullopt));
@@ -3335,8 +3310,7 @@
   ON_CALL(*track_publisher, largest_location)
       .WillByDefault(Return(Location(0, 0)));
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
   auto* delivery_alarm =
       absl::down_cast<quic::test::MockAlarmFactory::TestAlarm*>(
           MoqtSessionPeer::GetAlarm(stream_visitor.get()));
@@ -3378,7 +3352,6 @@
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kObjectDoesNotExist, 0,
-                                  MoqtForwardingPreference::kSubgroup,
                                   MoqtSessionPeer::Now(&session_)},
           quiche::QuicheMemSlice(), false}))
       .WillOnce(Return(std::nullopt));
@@ -3386,8 +3359,7 @@
   ON_CALL(*track_publisher, largest_location())
       .WillByDefault(Return(Location(0, 0)));
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   EXPECT_CALL(data_mock, Writev(_, _)).WillOnce(Return(absl::OkStatus()));
   subscription->OnNewFinAvailable(Location(0, 0), 0);
@@ -3433,7 +3405,6 @@
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(0, 0), 0, "",
                                   MoqtObjectStatus::kObjectDoesNotExist, 0,
-                                  MoqtForwardingPreference::kSubgroup,
                                   MoqtSessionPeer::Now(&session_)},
           quiche::QuicheMemSlice(), false}))
       .WillOnce(Return(std::nullopt));
@@ -3441,8 +3412,7 @@
   ON_CALL(*track_publisher, largest_location)
       .WillByDefault(Return(Location(0, 0)));
   subscription->OnNewObjectAvailable(Location(0, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   webtransport::test::MockStream data_mock2;
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
@@ -3464,7 +3434,6 @@
       .WillOnce(Return(PublishedObject{
           PublishedObjectMetadata{Location(1, 0), 0, "",
                                   MoqtObjectStatus::kObjectDoesNotExist, 0,
-                                  MoqtForwardingPreference::kSubgroup,
                                   MoqtSessionPeer::Now(&session_)},
           quiche::QuicheMemSlice(), false}))
       .WillOnce(Return(std::nullopt));
@@ -3472,8 +3441,7 @@
   ON_CALL(*track_publisher, largest_location)
       .WillByDefault(Return(Location(1, 0)));
   subscription->OnNewObjectAvailable(Location(1, 0), 0,
-                                     kDefaultPublisherPriority,
-                                     MoqtForwardingPreference::kSubgroup);
+                                     kDefaultPublisherPriority);
 
   // Group 1 should start the timer on the Group 0 stream.
   auto* delivery_alarm =
@@ -4197,20 +4165,19 @@
                                        /*start_object=*/0);
 
   // Send a datagram in window.
-  EXPECT_CALL(*mock_publisher, GetCachedObject(8, 0, 0)).WillOnce([&] {
-    return PublishedObject{
-        PublishedObjectMetadata{Location(8, 0), 0, "extensions",
-                                MoqtObjectStatus::kNormal, 128,
-                                MoqtForwardingPreference::kDatagram,
-                                MoqtSessionPeer::Now(&session_)},
-        quiche::QuicheMemSlice::Copy("deadbeef"), false};
-  });
+  EXPECT_CALL(*mock_publisher, GetCachedObject(8, std::optional<uint64_t>(), 0))
+      .WillOnce([&] {
+        return PublishedObject{
+            PublishedObjectMetadata{Location(8, 0), std::nullopt, "extensions",
+                                    MoqtObjectStatus::kNormal, 128,
+                                    MoqtSessionPeer::Now(&session_)},
+            quiche::QuicheMemSlice::Copy("deadbeef"), false};
+      });
   EXPECT_CALL(mock_session_, SendOrQueueDatagram)
       .WillOnce(Return(webtransport::DatagramStatus(
           webtransport::DatagramStatusCode::kSuccess, "")));
 
-  listener->OnNewObjectAvailable(Location(8, 0), 0, 0x80,
-                                 MoqtForwardingPreference::kDatagram);
+  listener->OnNewObjectAvailable(Location(8, 0), std::nullopt, 0x80);
 
   std::unique_ptr<MoqtControlParserVisitor> control_stream =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
@@ -4223,8 +4190,7 @@
   control_stream->OnRequestUpdateMessage(MoqtRequestUpdate{3, 1, parameters});
   EXPECT_CALL(*mock_publisher, GetCachedObject).Times(0);
   EXPECT_CALL(mock_session_, SendOrQueueDatagram).Times(0);
-  listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80,
-                                 MoqtForwardingPreference::kDatagram);
+  listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80);
 }
 
 }  // namespace test
diff --git a/quiche/quic/moqt/moqt_types.h b/quiche/quic/moqt/moqt_types.h
new file mode 100644
index 0000000..4698a3c
--- /dev/null
+++ b/quiche/quic/moqt/moqt_types.h
@@ -0,0 +1,65 @@
+// Copyright 2026 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef QUICHE_QUIC_MOQT_MOQT_TYPES_H_
+#define QUICHE_QUIC_MOQT_MOQT_TYPES_H_
+
+#include <cstdint>
+
+#include "absl/strings/str_format.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/quiche_data_writer.h"
+
+namespace moqt {
+
+inline constexpr uint64_t kMaxGroupId = quiche::kVarInt62MaxValue;
+inline constexpr uint64_t kMaxObjectId = quiche::kVarInt62MaxValue;
+// Location as defined in
+// https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
+struct Location {
+  uint64_t group = 0;
+  uint64_t object = 0;
+
+  Location() = default;
+  Location(uint64_t group, uint64_t object) : group(group), object(object) {}
+
+  // Location order as described in
+  // https://moq-wg.github.io/moq-transport/draft-ietf-moq-transport.html#location-structure
+  auto operator<=>(const Location&) const = default;
+
+  Location Next() const {
+    if (object == kMaxObjectId) {
+      if (group == kMaxObjectId) {
+        return Location(0, 0);
+      }
+      return Location(group + 1, 0);
+    }
+    return Location(group, object + 1);
+  }
+
+  template <typename H>
+  friend H AbslHashValue(H h, const Location& m);
+
+  template <typename Sink>
+  friend void AbslStringify(Sink& sink, const Location& sequence) {
+    absl::Format(&sink, "(%d; %d)", sequence.group, sequence.object);
+  }
+};
+
+template <typename H>
+H AbslHashValue(H h, const Location& m) {
+  return H::combine(std::move(h), m.group, m.object);
+}
+
+enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t {
+  kNormal = 0x0,
+  kObjectDoesNotExist = 0x1,
+  kEndOfGroup = 0x3,
+  kEndOfTrack = 0x4,
+  kInvalidObjectStatus = 0x5,
+};
+
+}  // namespace moqt
+
+#endif  // QUICHE_QUIC_MOQT_MOQT_TYPES_H_
diff --git a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
index 810a107..de9654a 100644
--- a/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/test_tools/moqt_mock_visitor.h
@@ -28,6 +28,7 @@
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_session_callbacks.h"
 #include "quiche/quic/moqt/moqt_session_interface.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/common/platform/api/quiche_test.h"
 #include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/quiche_weak_ptr.h"
@@ -76,7 +77,7 @@
   const FullTrackName& GetTrackName() const override { return track_name_; }
 
   MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
-              (uint64_t, uint64_t, uint64_t), (const, override));
+              (uint64_t, std::optional<uint64_t>, uint64_t), (const, override));
   MOCK_METHOD(void, AddObjectListener, (MoqtObjectListener * listener),
               (override));
   MOCK_METHOD(void, RemoveObjectListener, (MoqtObjectListener * listener),
@@ -104,7 +105,8 @@
       : track_name_(std::move(name)) {}
   const FullTrackName& GetTrackName() const override { return track_name_; }
   std::optional<PublishedObject> GetCachedObject(
-      uint64_t group, uint64_t subgroup, uint64_t object) const override {
+      uint64_t group, std::optional<uint64_t> subgroup,
+      uint64_t object) const override {
     Location location(group, object);
     auto it = objects_.find(location);
     if (it == objects_.end()) {
@@ -158,8 +160,7 @@
       largest_location_ = location;
     }
     for (MoqtObjectListener* listener : listeners_) {
-      listener->OnNewObjectAvailable(location, subgroup, 128,
-                                     MoqtForwardingPreference::kSubgroup);
+      listener->OnNewObjectAvailable(location, subgroup, 128);
     }
   }
   void RemoveAllSubscriptions() {
@@ -304,8 +305,7 @@
   MOCK_METHOD(void, OnSubscribeAccepted, (), (override));
   MOCK_METHOD(void, OnSubscribeRejected, (MoqtRequestErrorInfo), (override));
   MOCK_METHOD(void, OnNewObjectAvailable,
-              (Location, uint64_t, MoqtPriority, MoqtForwardingPreference),
-              (override));
+              (Location, std::optional<uint64_t>, MoqtPriority), (override));
   MOCK_METHOD(void, OnNewFinAvailable, (Location, uint64_t), (override));
   MOCK_METHOD(void, OnSubgroupAbandoned,
               (uint64_t, uint64_t, webtransport::StreamErrorCode), (override));
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 6b0e117..6059293 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -24,6 +24,7 @@
 #include "quiche/quic/moqt/moqt_publisher.h"
 #include "quiche/quic/moqt/moqt_session.h"
 #include "quiche/quic/moqt/moqt_track.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/web_transport/test_tools/mock_web_transport.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -32,7 +33,8 @@
 class MoqtDataParserPeer {
  public:
   static void SetType(MoqtDataParser* parser, MoqtDataStreamType type) {
-    parser->type_.emplace(std::move(type));
+    parser->type_ = type;
+    parser->next_input_ = MoqtDataParser::NextInput::kTrackAlias;
   }
 };
 
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index ccf040e..813e969 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -24,6 +24,7 @@
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/moqt_names.h"
 #include "quiche/quic/moqt/moqt_priority.h"
+#include "quiche/quic/moqt/moqt_types.h"
 #include "quiche/quic/platform/api/quic_logging.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/platform/api/quiche_export.h"
@@ -51,6 +52,20 @@
   return types;
 }
 
+inline std::vector<MoqtFetchSerialization> AllMoqtFetchSerializations() {
+  std::vector<MoqtFetchSerialization> serializations;
+  for (uint64_t i = 0; i < 128; ++i) {
+    std::optional<MoqtFetchSerialization> value =
+        MoqtFetchSerialization::FromValue(i);
+    if (value.has_value()) {
+      serializations.push_back(*value);
+    } else {
+      break;
+    }
+  }
+  return serializations;
+}
+
 inline std::vector<MoqtDataStreamType> AllMoqtDataStreamTypes() {
   std::vector<MoqtDataStreamType> types;
   types.push_back(MoqtDataStreamType::Fetch());
@@ -136,7 +151,9 @@
   }
 
   // Objects might need a different status if at the end of the stream.
-  virtual void MakeObjectEndOfStream() {}
+  virtual void MakeObjectEndOfStream() {
+    QUIC_LOG(INFO) << "MakeObjectEndOfStream not implemented";
+  }
 
  protected:
   void SetWireImage(uint8_t* wire_image, size_t wire_image_size) {
@@ -284,7 +301,7 @@
     object_.extension_headers =
         datagram_type.has_extension() ? std::string(kDefaultExtensionBlob) : "";
     object_.object_id = datagram_type.has_object_id() ? 6 : 0;
-    object_.subgroup_id = object_.object_id;
+    object_.subgroup_id = std::nullopt;
     quic::QuicDataWriter writer(sizeof(raw_packet_),
                                 reinterpret_cast<char*>(raw_packet_));
     EXPECT_TRUE(writer.WriteVarInt62(datagram_type.value()));
@@ -429,7 +446,7 @@
 // Used only for tests that process multiple objects on one stream.
 class QUICHE_NO_EXPORT StreamMiddlerSubgroupMessage : public ObjectMessage {
  public:
-  StreamMiddlerSubgroupMessage(MoqtDataStreamType type)
+  StreamMiddlerSubgroupMessage(const MoqtDataStreamType type)
       : ObjectMessage(), type_(type) {
     SetWireImage(reinterpret_cast<uint8_t*>(raw_packet_), sizeof(raw_packet_));
     if (type.SubgroupIsZero()) {
@@ -473,7 +490,7 @@
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvv-v-------v---", false);
+    ExpandVarintsImpl("vvvvvv-v-------v---", false);
   }
 
   bool SetPayloadLength(uint8_t payload_length) {
@@ -488,10 +505,10 @@
   }
 
  private:
-  uint8_t raw_packet_[18] = {
+  uint8_t raw_packet_[19] = {
       0x05,              // type field
-      0x04,              // subscribe ID
-                         // object middler:
+      0x04,              // request ID
+      0x3f,              // object serialization flag
       0x05, 0x08, 0x06,  // sequence
       0x07, 0x07,        // publisher priority, 7B extensions
       0x00, 0x0c, 0x01, 0x03, 0x66, 0x6f, 0x6f,  // extensions
@@ -502,22 +519,82 @@
 // Used only for tests that process multiple objects on one stream.
 class QUICHE_NO_EXPORT StreamMiddlerFetchMessage : public ObjectMessage {
  public:
-  StreamMiddlerFetchMessage() : ObjectMessage() {
-    SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.subgroup_id = 8;
-    object_.object_id = 9;
+  StreamMiddlerFetchMessage(MoqtFetchSerialization serialization)
+      : ObjectMessage(), serialization_(serialization) {
+    size_t length = 0;
+    if (serialization.is_datagram()) {  // Two byte varint.
+      raw_packet_[length++] = 0x40;
+    }
+    raw_packet_[length++] = static_cast<uint8_t>(serialization.value());
+    if (serialization.has_group_id()) {
+      raw_packet_[length++] = 0x06;  // group ID
+      object_.group_id = 6;
+    }
+    if (serialization.zero_subgroup_id()) {
+      object_.subgroup_id = 0;
+    } else if (serialization.has_subgroup_id()) {
+      raw_packet_[length++] = 0x0a;
+      object_.subgroup_id = 10;
+    } else if (serialization.prior_subgroup_id_plus_one()) {
+      if (!object_.subgroup_id.has_value()) {
+        QUICHE_BUG(quiche_bug_moqt_prior_subgroup_id_without_previous_subgroup)
+            << "prior_subgroup_id_plus_one without previous subgroup ID";
+        return;
+      }
+      ++(*object_.subgroup_id);
+    } else if (serialization.is_datagram()) {
+      object_.subgroup_id = std::nullopt;
+    }  // If prior_subgroup_id, subgroup_id is already set properly.
+    if (serialization.has_object_id()) {
+      raw_packet_[length++] = 0x0a;
+      object_.object_id = 10;
+    } else {
+      ++object_.object_id;
+    }
+    if (serialization.has_priority()) {
+      raw_packet_[length++] = 0x09;
+      object_.publisher_priority = MoqtPriority(0x09);
+    }
+    if (serialization.has_extensions()) {
+      memcpy(&raw_packet_[length], kRawExtensions.data(),
+             kRawExtensions.length());
+      length += kRawExtensions.length();
+    } else {
+      object_.extension_headers = "";
+    }
+    memcpy(&raw_packet_[length], kRawPayload.data(), kRawPayload.length());
+    length += kRawPayload.length();
+
+    SetWireImage(raw_packet_, length);
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvv-v-------v---", false);
+    std::string varints = "v";
+    if (serialization_.has_group_id()) {
+      varints += "v";
+    }
+    if (serialization_.has_subgroup_id()) {
+      varints += "v";
+    }
+    if (serialization_.has_object_id()) {
+      varints += "v";
+    }
+    if (serialization_.has_priority()) {
+      varints += "-";
+    }
+    if (serialization_.has_extensions()) {
+      varints += "v-------";
+    }
+    varints += "v---";
+    ExpandVarintsImpl(varints, false);
   }
 
  private:
-  uint8_t raw_packet_[16] = {
-      0x05, 0x08, 0x09, 0x07,                          // Object metadata
-      0x07, 0x00, 0x0c, 0x01, 0x03, 0x66, 0x6f, 0x6f,  // extensions
-      0x03, 0x62, 0x61, 0x72,                          // Payload = "bar"
-  };
+  MoqtFetchSerialization serialization_;
+  uint8_t raw_packet_[17];
+  static constexpr absl::string_view kRawExtensions{
+      "\x07\x00\x0c\x01\x03\x66\x6f\x6f", 8};  // see kDefaultExtensionBlob
+  static constexpr absl::string_view kRawPayload = "\x03\x62\x61\x72";
 };
 
 class QUICHE_NO_EXPORT ClientSetupMessage : public TestMessageBase {
@@ -1834,6 +1911,11 @@
   return std::make_unique<StreamHeaderSubgroupMessage>(type);
 }
 
+static inline std::unique_ptr<TestMessageBase> CreateTestFetch(
+    MoqtFetchSerialization type) {
+  return std::make_unique<StreamMiddlerFetchMessage>(type);
+}
+
 static inline std::unique_ptr<TestMessageBase> CreateTestDatagram(
     MoqtDatagramType type) {
   return std::make_unique<ObjectDatagramMessage>(type);