Implement MoQT Peeps and Object message changes for draft-06. This is the minimum for interoperability; this code always sends subgroup_id = 0 and ignores the incoming subgroup_id.

Also fixed bugs uncovered by fixing a typo in MoqtIntegrationTest.

PiperOrigin-RevId: 680684190
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 6974ca6..e7042e7 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -280,43 +280,37 @@
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
     const MoqtObject& message, bool is_first_in_stream) {
-  if (!message.payload_length.has_value() &&
-      !(message.forwarding_preference == MoqtForwardingPreference::kObject ||
-        message.forwarding_preference == MoqtForwardingPreference::kDatagram)) {
-    QUIC_BUG(quic_bug_serialize_object_input_01)
-        << "Track or Group forwarding preference requires knowing the object "
-           "length in advance";
+  if (!ValidateObjectMetadata(message)) {
+    QUIC_BUG(quic_bug_serialize_object_header_01)
+        << "Object metadata is invalid";
     return quiche::QuicheBuffer();
   }
-  if (message.object_status != MoqtObjectStatus::kNormal &&
-      message.payload_length.has_value() && *message.payload_length > 0) {
-    QUIC_BUG(quic_bug_serialize_object_input_03)
-        << "Object status must be kNormal if payload is non-empty";
+  if (message.forwarding_preference == MoqtForwardingPreference::kDatagram) {
+    QUIC_BUG(quic_bug_serialize_object_header_02)
+        << "Datagrams use SerializeObjectDatagram()";
     return quiche::QuicheBuffer();
   }
   if (!is_first_in_stream) {
     switch (message.forwarding_preference) {
       case MoqtForwardingPreference::kTrack:
-        return (*message.payload_length == 0)
+        return (message.payload_length == 0)
                    ? Serialize(WireVarInt62(message.group_id),
                                WireVarInt62(message.object_id),
-                               WireVarInt62(*message.payload_length),
+                               WireVarInt62(message.payload_length),
                                WireVarInt62(message.object_status))
                    : Serialize(WireVarInt62(message.group_id),
                                WireVarInt62(message.object_id),
-                               WireVarInt62(*message.payload_length));
-      case MoqtForwardingPreference::kGroup:
-        return (*message.payload_length == 0)
+                               WireVarInt62(message.payload_length));
+      case MoqtForwardingPreference::kSubgroup:
+        return (message.payload_length == 0)
                    ? Serialize(WireVarInt62(message.object_id),
-                               WireVarInt62(*message.payload_length),
+                               WireVarInt62(message.payload_length),
                                WireVarInt62(static_cast<uint64_t>(
                                    message.object_status)))
                    : Serialize(WireVarInt62(message.object_id),
-                               WireVarInt62(*message.payload_length));
+                               WireVarInt62(message.payload_length));
       default:
-        QUIC_BUG(quic_bug_serialize_object_input_02)
-            << "Object or Datagram forwarding_preference must be first in "
-               "stream";
+        QUICHE_NOTREACHED();
         return quiche::QuicheBuffer();
     }
   }
@@ -324,14 +318,14 @@
       GetMessageTypeForForwardingPreference(message.forwarding_preference);
   switch (message.forwarding_preference) {
     case MoqtForwardingPreference::kTrack:
-      return (*message.payload_length == 0)
+      return (message.payload_length == 0)
                  ? Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.subscribe_id),
                              WireVarInt62(message.track_alias),
                              WireUint8(message.publisher_priority),
                              WireVarInt62(message.group_id),
                              WireVarInt62(message.object_id),
-                             WireVarInt62(*message.payload_length),
+                             WireVarInt62(message.payload_length),
                              WireVarInt62(message.object_status))
                  : Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.subscribe_id),
@@ -339,49 +333,64 @@
                              WireUint8(message.publisher_priority),
                              WireVarInt62(message.group_id),
                              WireVarInt62(message.object_id),
-                             WireVarInt62(*message.payload_length));
-    case MoqtForwardingPreference::kGroup:
-      return (*message.payload_length == 0)
+                             WireVarInt62(message.payload_length));
+    case MoqtForwardingPreference::kSubgroup:
+      return (message.payload_length == 0)
                  ? Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.subscribe_id),
                              WireVarInt62(message.track_alias),
                              WireVarInt62(message.group_id),
+                             WireVarInt62(*message.subgroup_id),
                              WireUint8(message.publisher_priority),
                              WireVarInt62(message.object_id),
-                             WireVarInt62(*message.payload_length),
+                             WireVarInt62(message.payload_length),
                              WireVarInt62(message.object_status))
                  : Serialize(WireVarInt62(message_type),
                              WireVarInt62(message.subscribe_id),
                              WireVarInt62(message.track_alias),
                              WireVarInt62(message.group_id),
+                             WireVarInt62(*message.subgroup_id),
                              WireUint8(message.publisher_priority),
                              WireVarInt62(message.object_id),
-                             WireVarInt62(*message.payload_length));
-    case MoqtForwardingPreference::kObject:
+                             WireVarInt62(message.payload_length));
     case MoqtForwardingPreference::kDatagram:
-      return Serialize(
-          WireVarInt62(message_type), WireVarInt62(message.subscribe_id),
-          WireVarInt62(message.track_alias), WireVarInt62(message.group_id),
-          WireVarInt62(message.object_id),
-          WireUint8(message.publisher_priority),
-          WireVarInt62(message.object_status));
+      QUICHE_NOTREACHED();
+      return quiche::QuicheBuffer();
   }
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeObjectDatagram(
     const MoqtObject& message, absl::string_view payload) {
-  if (message.object_status != MoqtObjectStatus::kNormal && !payload.empty()) {
+  if (!ValidateObjectMetadata(message)) {
     QUIC_BUG(quic_bug_serialize_object_datagram_01)
-        << "Object status must be kNormal if payload is non-empty";
+        << "Object metadata is invalid";
     return quiche::QuicheBuffer();
   }
+  if (message.forwarding_preference != MoqtForwardingPreference::kDatagram) {
+    QUIC_BUG(quic_bug_serialize_object_datagram_02)
+        << "Only datagrams use SerializeObjectDatagram()";
+    return quiche::QuicheBuffer();
+  }
+  if (message.payload_length != payload.length()) {
+    QUIC_BUG(quic_bug_serialize_object_datagram_03)
+        << "Payload length does not match payload";
+    return quiche::QuicheBuffer();
+  }
+  if (message.payload_length == 0) {
+    return Serialize(
+        WireVarInt62(MoqtDataStreamType::kObjectDatagram),
+        WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
+        WireVarInt62(message.group_id), WireVarInt62(message.object_id),
+        WireUint8(message.publisher_priority),
+        WireVarInt62(message.payload_length),
+        WireVarInt62(message.object_status));
+  }
   return Serialize(
       WireVarInt62(MoqtDataStreamType::kObjectDatagram),
       WireVarInt62(message.subscribe_id), WireVarInt62(message.track_alias),
       WireVarInt62(message.group_id), WireVarInt62(message.object_id),
       WireUint8(message.publisher_priority),
-      WireVarInt62(static_cast<uint64_t>(message.object_status)),
-      WireBytes(payload));
+      WireVarInt62(message.payload_length), WireBytes(payload));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeClientSetup(
@@ -629,4 +638,17 @@
                        message.delta_from_deadline.ToMicroseconds())));
 }
 
+// static
+bool MoqtFramer::ValidateObjectMetadata(const MoqtObject& object) {
+  if (object.object_status != MoqtObjectStatus::kNormal &&
+      object.payload_length > 0) {
+    return false;
+  }
+  if ((object.forwarding_preference == MoqtForwardingPreference::kSubgroup) !=
+      object.subgroup_id.has_value()) {
+    return false;
+  }
+  return true;
+}
+
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 40b4dbf..b18a77c 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -60,6 +60,9 @@
   quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
 
  private:
+  // Returns true if the metadata is internally consistent.
+  static bool ValidateObjectMetadata(const MoqtObject& object);
+
   quiche::QuicheBufferAllocator* allocator_;
   bool using_webtrans_;
 };
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 510dd17..be6ae32 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -4,7 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_framer.h"
 
-#include <cerrno>
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -19,7 +18,6 @@
 #include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
-#include "quiche/common/quiche_text_utils.h"
 #include "quiche/common/simple_buffer_allocator.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
 
@@ -81,7 +79,9 @@
   MoqtObject adjusted_message = message;
   adjusted_message.payload_length = payload.size();
   quiche::QuicheBuffer header =
-      framer.SerializeObjectHeader(adjusted_message, is_first_in_stream);
+      (message.forwarding_preference == MoqtForwardingPreference::kDatagram)
+          ? framer.SerializeObjectDatagram(adjusted_message, payload)
+          : framer.SerializeObjectHeader(adjusted_message, is_first_in_stream);
   if (header.empty()) {
     return quiche::QuicheBuffer();
   }
@@ -217,13 +217,13 @@
 };
 
 TEST_F(MoqtFramerSimpleTest, GroupMiddler) {
-  auto header = std::make_unique<StreamHeaderGroupMessage>();
+  auto header = std::make_unique<StreamHeaderSubgroupMessage>();
   auto buffer1 = SerializeObject(
       framer_, std::get<MoqtObject>(header->structured_data()), "foo", true);
   EXPECT_EQ(buffer1.size(), header->total_message_size());
   EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
 
-  auto middler = std::make_unique<StreamMiddlerGroupMessage>();
+  auto middler = std::make_unique<StreamMiddlerSubgroupMessage>();
   auto buffer2 = SerializeObject(
       framer_, std::get<MoqtObject>(middler->structured_data()), "bar", false);
   EXPECT_EQ(buffer2.size(), middler->total_message_size());
@@ -246,28 +246,80 @@
 
 TEST_F(MoqtFramerSimpleTest, BadObjectInput) {
   MoqtObject object = {
+      // This is a valid object.
       /*subscribe_id=*/3,
       /*track_alias=*/4,
       /*group_id=*/5,
       /*object_id=*/6,
       /*publisher_priority=*/7,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kObject,
-      /*payload_length=*/std::nullopt,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/8,
+      /*payload_length=*/3,
   };
   quiche::QuicheBuffer buffer;
-  object.forwarding_preference = MoqtForwardingPreference::kDatagram;
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
-                  "must be first");
+
+  // SerializeObjectDatagram() only accepts kDatagram.
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+                  "Only datagrams use SerializeObjectDatagram()");
   EXPECT_TRUE(buffer.empty());
-  object.forwarding_preference = MoqtForwardingPreference::kGroup;
+
+  // kSubgroup must have a subgroup_id.
+  object.subgroup_id = std::nullopt;
   EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
-                  "requires knowing the object length");
+                  "Object metadata is invalid");
   EXPECT_TRUE(buffer.empty());
-  object.payload_length = 5;
+  object.subgroup_id = 8;
+
+  // kTrack must not have a subgroup_id.
+  object.forwarding_preference = MoqtForwardingPreference::kTrack;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+                  "Object metadata is invalid");
+  EXPECT_TRUE(buffer.empty());
+  object.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+
+  // Non-normal status must have no payload.
   object.object_status = MoqtObjectStatus::kEndOfGroup;
   EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
-                  "Object status must be kNormal if payload is non-empty");
+                  "Object metadata is invalid");
+  EXPECT_TRUE(buffer.empty());
+  // object.object_status = MoqtObjectStatus::kNormal;
+}
+
+TEST_F(MoqtFramerSimpleTest, BadDatagramInput) {
+  MoqtObject object = {
+      // This is a valid datagram.
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id=*/5,
+      /*object_id=*/6,
+      /*publisher_priority=*/7,
+      /*object_status=*/MoqtObjectStatus::kNormal,
+      /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+      /*subgroup_id=*/std::nullopt,
+      /*payload_length=*/3,
+  };
+  quiche::QuicheBuffer buffer;
+
+  // No datagrams to SerializeObjectHeader().
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectHeader(object, false),
+                  "Datagrams use SerializeObjectDatagram()")
+  EXPECT_TRUE(buffer.empty());
+
+  object.object_status = MoqtObjectStatus::kEndOfGroup;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+                  "Object metadata is invalid");
+  EXPECT_TRUE(buffer.empty());
+  object.object_status = MoqtObjectStatus::kNormal;
+
+  object.subgroup_id = 8;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foo"),
+                  "Object metadata is invalid");
+  EXPECT_TRUE(buffer.empty());
+  object.subgroup_id = std::nullopt;
+
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObjectDatagram(object, "foobar"),
+                  "Payload length does not match payload");
   EXPECT_TRUE(buffer.empty());
 }
 
@@ -280,8 +332,9 @@
       /*object_id=*/6,
       /*publisher_priority=*/7,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kObject,
-      /*payload_length=*/std::nullopt,
+      /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+      /*subgroup_id=*/std::nullopt,
+      /*payload_length=*/3,
   };
   std::string payload = "foo";
   quiche::QuicheBuffer buffer;
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 11f6be4..d398331 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -190,7 +190,7 @@
       });
 
   auto queue = std::make_shared<MoqtOutgoingQueue>(
-      FullTrackName{"test", "data"}, MoqtForwardingPreference::kGroup);
+      FullTrackName{"test", "data"}, MoqtForwardingPreference::kSubgroup);
   MoqtKnownTrackPublisher known_track_publisher;
   known_track_publisher.Add(queue);
   client_->session()->set_publisher(&known_track_publisher);
@@ -215,7 +215,7 @@
         EXPECT_EQ(group_sequence, 0u);
         EXPECT_EQ(object_sequence, 0u);
         EXPECT_EQ(status, MoqtObjectStatus::kNormal);
-        EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kGroup);
+        EXPECT_EQ(forwarding_preference, MoqtForwardingPreference::kSubgroup);
         EXPECT_EQ(object, "object data");
         EXPECT_TRUE(end_of_message);
         received_object = true;
@@ -232,15 +232,14 @@
   server_->session()->set_publisher(&publisher);
 
   for (MoqtForwardingPreference forwarding_preference :
-       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
-        MoqtForwardingPreference::kObject,
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup,
         MoqtForwardingPreference::kDatagram}) {
     SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
     MockRemoteTrackVisitor client_visitor;
     std::string name =
         absl::StrCat("pref_", static_cast<int>(forwarding_preference));
     auto queue = std::make_shared<MoqtOutgoingQueue>(
-        FullTrackName{"test", name}, MoqtForwardingPreference::kObject);
+        FullTrackName{"test", name}, MoqtForwardingPreference::kSubgroup);
     publisher.Add(queue);
     queue->AddObject(MemSliceFromString("object 1"), /*key=*/true);
     queue->AddObject(MemSliceFromString("object 2"), /*key=*/false);
@@ -294,15 +293,14 @@
   server_->session()->set_publisher(&publisher);
 
   for (MoqtForwardingPreference forwarding_preference :
-       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kGroup,
-        MoqtForwardingPreference::kObject,
+       {MoqtForwardingPreference::kTrack, MoqtForwardingPreference::kSubgroup,
         MoqtForwardingPreference::kDatagram}) {
     SCOPED_TRACE(MoqtForwardingPreferenceToString(forwarding_preference));
     MockRemoteTrackVisitor client_visitor;
     std::string name =
         absl::StrCat("pref_", static_cast<int>(forwarding_preference));
     auto queue = std::make_shared<MoqtOutgoingQueue>(
-        FullTrackName{"test", name}, MoqtForwardingPreference::kObject);
+        FullTrackName{"test", name}, forwarding_preference);
     publisher.Add(queue);
     for (int i = 0; i < 100; ++i) {
       queue->AddObject(MemSliceFromString("object"), /*key=*/true);
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.cc b/quiche/quic/moqt/moqt_live_relay_queue.cc
index 5fdcf65..6cee208 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue.cc
@@ -23,6 +23,9 @@
 
 namespace moqt {
 
+// TODO(martinduke): Accept subgroup ID.
+// TODO(martinduke): Accept publisher priority.
+// TODO(martinduke): Unless Track Forwarding preference goes away, support it.
 bool MoqtLiveRelayQueue::AddObject(uint64_t group_id, uint64_t object_id,
                                    MoqtObjectStatus status,
                                    absl::string_view object) {
diff --git a/quiche/quic/moqt/moqt_live_relay_queue.h b/quiche/quic/moqt/moqt_live_relay_queue.h
index 8c3fbe3..ae2a58e 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue.h
+++ b/quiche/quic/moqt/moqt_live_relay_queue.h
@@ -38,7 +38,8 @@
   explicit MoqtLiveRelayQueue(FullTrackName track,
                               MoqtForwardingPreference forwarding_preference)
       : track_(std::move(track)),
-        forwarding_preference_(forwarding_preference) {}
+        forwarding_preference_(forwarding_preference),
+        next_sequence_(0, 0) {}
 
   MoqtLiveRelayQueue(const MoqtLiveRelayQueue&) = delete;
   MoqtLiveRelayQueue(MoqtLiveRelayQueue&&) = default;
diff --git a/quiche/quic/moqt/moqt_live_relay_queue_test.cc b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
index 6d4e765..3734e48 100644
--- a/quiche/quic/moqt/moqt_live_relay_queue_test.cc
+++ b/quiche/quic/moqt/moqt_live_relay_queue_test.cc
@@ -24,7 +24,7 @@
  public:
   TestMoqtLiveRelayQueue()
       : MoqtLiveRelayQueue(FullTrackName{"test", "track"},
-                           MoqtForwardingPreference::kGroup) {
+                           MoqtForwardingPreference::kSubgroup) {
     AddObjectListener(this);
   }
 
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 5c2e503..95d8413 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/moqt/moqt_messages.h"
 
+#include <cstdint>
 #include <string>
 #include <vector>
 
@@ -19,7 +20,8 @@
 namespace moqt {
 
 MoqtObjectStatus IntegerToObjectStatus(uint64_t integer) {
-  if (integer >= 0x5) {
+  if (integer >=
+      static_cast<uint64_t>(MoqtObjectStatus::kInvalidObjectStatus)) {
     return MoqtObjectStatus::kInvalidObjectStatus;
   }
   return static_cast<MoqtObjectStatus>(integer);
@@ -113,14 +115,12 @@
 
 std::string MoqtDataStreamTypeToString(MoqtDataStreamType type) {
   switch (type) {
-    case MoqtDataStreamType::kObjectStream:
-      return "OBJECT_STREAM";
     case MoqtDataStreamType::kObjectDatagram:
       return "OBJECT_PREFER_DATAGRAM";
     case MoqtDataStreamType::kStreamHeaderTrack:
       return "STREAM_HEADER_TRACK";
-    case MoqtDataStreamType::kStreamHeaderGroup:
-      return "STREAM_HEADER_GROUP";
+    case MoqtDataStreamType::kStreamHeaderSubgroup:
+      return "STREAM_HEADER_SUBGROUP";
     case MoqtDataStreamType::kPadding:
       return "PADDING";
   }
@@ -130,14 +130,12 @@
 std::string MoqtForwardingPreferenceToString(
     MoqtForwardingPreference preference) {
   switch (preference) {
-    case MoqtForwardingPreference::kObject:
-      return "OBJECT";
     case MoqtForwardingPreference::kDatagram:
       return "DATAGRAM";
     case MoqtForwardingPreference::kTrack:
       return "TRACK";
-    case MoqtForwardingPreference::kGroup:
-      return "GROUP";
+    case MoqtForwardingPreference::kSubgroup:
+      return "SUBGROUP";
   }
   QUIC_BUG(quic_bug_bad_moqt_message_type_01)
       << "Unknown preference " << std::to_string(static_cast<int>(preference));
@@ -146,37 +144,33 @@
 
 MoqtForwardingPreference GetForwardingPreference(MoqtDataStreamType type) {
   switch (type) {
-    case MoqtDataStreamType::kObjectStream:
-      return MoqtForwardingPreference::kObject;
     case MoqtDataStreamType::kObjectDatagram:
       return MoqtForwardingPreference::kDatagram;
     case MoqtDataStreamType::kStreamHeaderTrack:
       return MoqtForwardingPreference::kTrack;
-    case MoqtDataStreamType::kStreamHeaderGroup:
-      return MoqtForwardingPreference::kGroup;
+    case MoqtDataStreamType::kStreamHeaderSubgroup:
+      return MoqtForwardingPreference::kSubgroup;
     default:
       break;
   }
   QUIC_BUG(quic_bug_bad_moqt_message_type_02)
       << "Message type does not indicate forwarding preference";
-  return MoqtForwardingPreference::kObject;
+  return MoqtForwardingPreference::kSubgroup;
 };
 
 MoqtDataStreamType GetMessageTypeForForwardingPreference(
     MoqtForwardingPreference preference) {
   switch (preference) {
-    case MoqtForwardingPreference::kObject:
-      return MoqtDataStreamType::kObjectStream;
     case MoqtForwardingPreference::kDatagram:
       return MoqtDataStreamType::kObjectDatagram;
     case MoqtForwardingPreference::kTrack:
       return MoqtDataStreamType::kStreamHeaderTrack;
-    case MoqtForwardingPreference::kGroup:
-      return MoqtDataStreamType::kStreamHeaderGroup;
+    case MoqtForwardingPreference::kSubgroup:
+      return MoqtDataStreamType::kStreamHeaderSubgroup;
   }
   QUIC_BUG(quic_bug_bad_moqt_message_type_03)
       << "Forwarding preference does not indicate message type";
-  return MoqtDataStreamType::kObjectStream;
+  return MoqtDataStreamType::kStreamHeaderSubgroup;
 }
 
 std::string FullTrackName::ToString() const {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 4a08832..57c26ab 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -65,10 +65,9 @@
 inline constexpr size_t kMaxMessageHeaderSize = 2048;
 
 enum class QUICHE_EXPORT MoqtDataStreamType : uint64_t {
-  kObjectStream = 0x00,
   kObjectDatagram = 0x01,
-  kStreamHeaderTrack = 0x50,
-  kStreamHeaderGroup = 0x51,
+  kStreamHeaderTrack = 0x02,
+  kStreamHeaderSubgroup = 0x04,
 
   // Currently QUICHE-specific.  All data on a kPadding stream is ignored.
   kPadding = 0x26d3,
@@ -218,11 +217,20 @@
 
 // These are absolute sequence numbers.
 struct FullSequence {
-  uint64_t group = 0;
-  uint64_t object = 0;
+  uint64_t group;
+  uint64_t subgroup;
+  uint64_t object;
+  FullSequence() : FullSequence(0, 0) {}
+  // There is a lot of code from before subgroups. Assume there's one subgroup
+  // with ID 0 per group.
+  FullSequence(uint64_t group, uint64_t object)
+      : FullSequence(group, 0, object) {}
+  FullSequence(uint64_t group, uint64_t subgroup, uint64_t object)
+      : group(group), subgroup(subgroup), object(object) {}
   bool operator==(const FullSequence& other) const {
     return group == other.group && object == other.object;
   }
+  // These are temporal ordering comparisons, so subgroup ID doesn't matter.
   bool operator<(const FullSequence& other) const {
     return group < other.group ||
            (group == other.group && object < other.object);
@@ -237,7 +245,9 @@
     object = other.object;
     return *this;
   }
-  FullSequence next() const { return FullSequence{group, object + 1}; }
+  FullSequence next() const {
+    return FullSequence{group, subgroup, object + 1};
+  }
   template <typename H>
   friend H AbslHashValue(H h, const FullSequence& m);
 
@@ -247,6 +257,11 @@
   }
 };
 
+struct SubgroupPriority {
+  uint8_t publisher_priority = 0xf0;
+  uint64_t subgroup_id = 0;
+};
+
 template <typename H>
 H AbslHashValue(H h, const FullSequence& m) {
   return H::combine(std::move(h), m.group, m.object);
@@ -268,11 +283,10 @@
 };
 
 // These codes do not appear on the wire.
-enum class QUICHE_EXPORT MoqtForwardingPreference : uint8_t {
-  kTrack = 0x0,
-  kGroup = 0x1,
-  kObject = 0x2,
-  kDatagram = 0x3,
+enum class QUICHE_EXPORT MoqtForwardingPreference {
+  kTrack,
+  kSubgroup,
+  kDatagram,
 };
 
 enum class QUICHE_EXPORT MoqtObjectStatus : uint64_t {
@@ -281,14 +295,14 @@
   kGroupDoesNotExist = 0x2,
   kEndOfGroup = 0x3,
   kEndOfTrack = 0x4,
-  kInvalidObjectStatus = 0x5,
+  kEndOfSubgroup = 0x5,
+  kInvalidObjectStatus = 0x6,
 };
 
 MoqtObjectStatus IntegerToObjectStatus(uint64_t integer);
 
 // The data contained in every Object message, although the message type
-// implies some of the values. |payload_length| has no value if the length
-// is unknown (because it runs to the end of the stream.)
+// implies some of the values.
 struct QUICHE_EXPORT MoqtObject {
   uint64_t subscribe_id;
   uint64_t track_alias;
@@ -297,7 +311,8 @@
   MoqtPriority publisher_priority;
   MoqtObjectStatus object_status;
   MoqtForwardingPreference forwarding_preference;
-  std::optional<uint64_t> payload_length;
+  std::optional<uint64_t> subgroup_id;
+  uint64_t payload_length;
 };
 
 enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index d7a9785..f7cf5e6 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -28,7 +28,7 @@
  public:
   TestMoqtOutgoingQueue()
       : MoqtOutgoingQueue(FullTrackName{"test", "track"},
-                          MoqtForwardingPreference::kGroup) {
+                          MoqtForwardingPreference::kSubgroup) {
     AddObjectListener(this);
   }
 
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 541c550..51658d4 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -52,7 +52,7 @@
 
 bool IsAllowedStreamType(uint64_t value) {
   constexpr std::array kAllowedStreamTypes = {
-      MoqtDataStreamType::kObjectStream, MoqtDataStreamType::kStreamHeaderGroup,
+      MoqtDataStreamType::kStreamHeaderSubgroup,
       MoqtDataStreamType::kStreamHeaderTrack, MoqtDataStreamType::kPadding};
   for (MoqtDataStreamType type : kAllowedStreamTypes) {
     if (static_cast<uint64_t>(type) == value) {
@@ -72,18 +72,24 @@
       !reader.ReadVarInt62(&object.group_id)) {
     return 0;
   }
-  if (type != MoqtDataStreamType::kStreamHeaderTrack &&
-      type != MoqtDataStreamType::kStreamHeaderGroup &&
+  if (type == MoqtDataStreamType::kStreamHeaderSubgroup) {
+    uint64_t subgroup_id;
+    if (!reader.ReadVarInt62(&subgroup_id)) {
+      return 0;
+    }
+    object.subgroup_id = subgroup_id;
+  }
+  if (type == MoqtDataStreamType::kObjectDatagram &&
       !reader.ReadVarInt62(&object.object_id)) {
     return 0;
   }
   if (!reader.ReadUInt8(&object.publisher_priority)) {
     return 0;
   }
-  uint64_t status = 0;
-  if ((type == MoqtDataStreamType::kObjectStream ||
-       type == MoqtDataStreamType::kObjectDatagram) &&
-      !reader.ReadVarInt62(&status)) {
+  uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
+  if (type == MoqtDataStreamType::kObjectDatagram &&
+      (!reader.ReadVarInt62(&object.payload_length) ||
+       (object.payload_length == 0 && !reader.ReadVarInt62(&status)))) {
     return 0;
   }
   object.object_status = IntegerToObjectStatus(status);
@@ -100,15 +106,13 @@
       }
       [[fallthrough]];
 
-    case MoqtDataStreamType::kStreamHeaderGroup: {
-      uint64_t length;
+    case MoqtDataStreamType::kStreamHeaderSubgroup: {
       if (!reader.ReadVarInt62(&object.object_id) ||
-          !reader.ReadVarInt62(&length)) {
+          !reader.ReadVarInt62(&object.payload_length)) {
         return 0;
       }
-      object.payload_length = length;
-      uint64_t status = 0;
-      if (length == 0 && !reader.ReadVarInt62(&status)) {
+      uint64_t status = static_cast<uint64_t>(MoqtObjectStatus::kNormal);
+      if (object.payload_length == 0 && !reader.ReadVarInt62(&status)) {
         return 0;
       }
       object.object_status = IntegerToObjectStatus(status);
@@ -902,11 +906,6 @@
     return;
   }
 
-  // Annoying path (going away soon): handle kObjectStream receiving a FIN.
-  if (data.empty() && fin && type_ == MoqtDataStreamType::kObjectStream) {
-    visitor_.OnObjectMessage(*metadata_, "", true);
-  }
-
   // Sad path: there is already data buffered.  Attempt to transfer a small
   // chunk from `data` into the buffer, in hope that it will make the contents
   // of the buffer parsable without any leftover data.  This is a reasonable
@@ -915,8 +914,7 @@
   while (!buffered_message_.empty() && !data.empty()) {
     absl::string_view chunk = data.substr(0, chunk_size_);
     absl::StrAppend(&buffered_message_, chunk);
-    absl::string_view unprocessed =
-        ProcessDataInner(buffered_message_, fin && data.size() == chunk.size());
+    absl::string_view unprocessed = ProcessDataInner(buffered_message_);
     if (unprocessed.size() >= chunk.size()) {
       // chunk didn't allow any processing at all.
       data.remove_prefix(chunk.size());
@@ -928,7 +926,7 @@
 
   // Happy path: there is no buffered data.
   if (buffered_message_.empty() && !data.empty()) {
-    buffered_message_.assign(ProcessDataInner(data, fin));
+    buffered_message_.assign(ProcessDataInner(data));
   }
 
   if (fin) {
@@ -941,8 +939,7 @@
   }
 }
 
-absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data,
-                                                   bool fin) {
+absl::string_view MoqtDataParser::ProcessDataInner(absl::string_view data) {
   quic::QuicDataReader reader(data);
   while (!reader.IsDoneReading()) {
     absl::string_view remainder = reader.PeekRemainingPayload();
@@ -966,11 +963,6 @@
         if (bytes_read == 0) {
           return remainder;
         }
-        if (type_ == MoqtDataStreamType::kObjectStream &&
-            header.object_status == MoqtObjectStatus::kInvalidObjectStatus) {
-          ParseError("Invalid object status");
-          return "";
-        }
         metadata_ = header;
         continue;
       }
@@ -985,22 +977,14 @@
           ParseError("Invalid object status provided");
           return "";
         }
-        payload_length_remaining_ = *metadata_->payload_length;
+        payload_length_remaining_ = metadata_->payload_length;
+        if (payload_length_remaining_ == 0) {
+          visitor_.OnObjectMessage(*metadata_, "", true);
+        }
         continue;
       }
 
       case kData: {
-        if (payload_length_remaining_ == 0) {
-          // Special case: kObject, which does not have explicit length.
-          if (metadata_->object_status != MoqtObjectStatus::kNormal) {
-            ParseError("Object with non-normal status has payload");
-            return "";
-          }
-          visitor_.OnObjectMessage(*metadata_, reader.PeekRemainingPayload(),
-                                   fin);
-          return "";
-        }
-
         absl::string_view payload =
             reader.ReadAtMost(payload_length_remaining_);
         visitor_.OnObjectMessage(*metadata_, payload,
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 918573a..876b27a 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -206,8 +206,7 @@
     if (!metadata_.has_value()) {
       return kHeader;
     }
-    if (payload_length_remaining_ > 0 ||
-        *type_ == MoqtDataStreamType::kObjectStream) {
+    if (payload_length_remaining_ > 0) {
       return kData;
     }
     return kSubheader;
@@ -215,8 +214,7 @@
 
   // Processes all that can be entirely processed, and returns the view for the
   // data that needs to be buffered.
-  // TODO: remove the `fin` argument once kObjectStream is gone.
-  absl::string_view ProcessDataInner(absl::string_view data, bool fin);
+  absl::string_view ProcessDataInner(absl::string_view data);
 
   void ParseError(absl::string_view reason);
 
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 633f83f..6c34083 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -41,9 +41,9 @@
     MoqtMessageType::kServerSetup,    MoqtMessageType::kGoAway,
     MoqtMessageType::kMaxSubscribeId, MoqtMessageType::kObjectAck,
 };
-constexpr std::array kDataStreamTypes{MoqtDataStreamType::kObjectStream,
-                                      MoqtDataStreamType::kStreamHeaderTrack,
-                                      MoqtDataStreamType::kStreamHeaderGroup};
+constexpr std::array kDataStreamTypes{
+    MoqtDataStreamType::kStreamHeaderTrack,
+    MoqtDataStreamType::kStreamHeaderSubgroup};
 
 using GeneralizedMessageType =
     absl::variant<MoqtMessageType, MoqtDataStreamType>;
@@ -325,10 +325,6 @@
 }
 
 TEST_P(MoqtParserTest, EarlyFin) {
-  if (message_type_ ==
-      GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) {
-    return;
-  }
   std::unique_ptr<TestMessageBase> message = MakeMessage();
   size_t first_data_size = message->total_message_size() - 1;
   ProcessData(message->PacketSample().substr(0, first_data_size), true);
@@ -339,10 +335,6 @@
 }
 
 TEST_P(MoqtParserTest, SeparateEarlyFin) {
-  if (message_type_ ==
-      GeneralizedMessageType(MoqtDataStreamType::kObjectStream)) {
-    return;
-  }
   std::unique_ptr<TestMessageBase> message = MakeMessage();
   size_t first_data_size = message->total_message_size() - 1;
   ProcessData(message->PacketSample().substr(0, first_data_size), false);
@@ -366,29 +358,12 @@
   static constexpr bool kRawQuic = false;
 };
 
-TEST_F(MoqtMessageSpecificTest, ObjectStreamSeparateFin) {
-  // OBJECT can return on an unknown-length message even without receiving a
-  // FIN.
-  MoqtDataParser parser(&visitor_);
-  auto message = std::make_unique<ObjectStreamMessage>();
-  parser.ProcessData(message->PacketSample(), false);
-  EXPECT_EQ(visitor_.messages_received_, 0);
-  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
-  EXPECT_EQ(visitor_.object_payload(), "foo");
-  EXPECT_FALSE(visitor_.end_of_message_);
-
-  parser.ProcessData(absl::string_view(), true);  // send the FIN
-  EXPECT_EQ(visitor_.messages_received_, 1);
-  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
-  EXPECT_TRUE(visitor_.end_of_message_);
-  EXPECT_FALSE(visitor_.parsing_error_.has_value());
-}
-
 // Send the header + some payload, pure payload, then pure payload to end the
 // message.
 TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
   MoqtDataParser parser(&visitor_);
-  auto message = std::make_unique<ObjectStreamMessage>();
+  auto message = std::make_unique<StreamHeaderSubgroupMessage>();
+  EXPECT_TRUE(message->SetPayloadLength(14));
   parser.ProcessData(message->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -414,23 +389,24 @@
 // Send the part of header, rest of header + payload, plus payload.
 TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) {
   MoqtDataParser parser(&visitor_);
-  auto message = std::make_unique<ObjectStreamMessage>();
+  auto message = std::make_unique<StreamHeaderSubgroupMessage>();
+  EXPECT_TRUE(message->SetPayloadLength(50));
 
   // first part
   parser.ProcessData(message->PacketSample().substr(0, 4), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
 
   // second part. Add padding to it.
-  message->set_wire_image_size(100);
+  message->set_wire_image_size(55);
   parser.ProcessData(
       message->PacketSample().substr(4, message->total_message_size() - 4),
       false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
   EXPECT_FALSE(visitor_.end_of_message_);
-  // The value "93" is the overall wire image size of 100 minus the non-payload
+  // The value "47" is the overall wire image size of 55 minus the non-payload
   // part of the message.
-  EXPECT_EQ(visitor_.object_payload().length(), 93);
+  EXPECT_EQ(visitor_.object_payload().length(), 47);
 
   // third part includes FIN
   parser.ProcessData("bar", true);
@@ -441,10 +417,10 @@
   EXPECT_FALSE(visitor_.parsing_error_.has_value());
 }
 
-TEST_F(MoqtMessageSpecificTest, StreamHeaderGroupFollowOn) {
+TEST_F(MoqtMessageSpecificTest, StreamHeaderSubgroupFollowOn) {
   MoqtDataParser parser(&visitor_);
   // first part
-  auto message1 = std::make_unique<StreamHeaderGroupMessage>();
+  auto message1 = std::make_unique<StreamHeaderSubgroupMessage>();
   parser.ProcessData(message1->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 1);
   EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
@@ -453,7 +429,7 @@
   EXPECT_FALSE(visitor_.parsing_error_.has_value());
   // second part
   visitor_.object_payloads_.clear();
-  auto message2 = std::make_unique<StreamMiddlerGroupMessage>();
+  auto message2 = std::make_unique<StreamMiddlerSubgroupMessage>();
   parser.ProcessData(message2->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 2);
   EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
@@ -832,7 +808,7 @@
 
 TEST_F(MoqtMessageSpecificTest, FinMidPayload) {
   MoqtDataParser parser(&visitor_);
-  auto message = std::make_unique<StreamHeaderGroupMessage>();
+  auto message = std::make_unique<StreamHeaderSubgroupMessage>();
   parser.ProcessData(
       message->PacketSample().substr(0, message->total_message_size() - 1),
       true);
@@ -863,28 +839,18 @@
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
 
-TEST_F(MoqtMessageSpecificTest, NonNormalObjectHasPayload) {
-  MoqtDataParser parser(&visitor_);
-  char object_stream[] = {
-      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x02,  // varints
-      0x66, 0x6f, 0x6f,                          // payload = "foo"
-  };
-  parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
-                     false);
-  EXPECT_EQ(visitor_.parsing_error_,
-            "Object with non-normal status has payload");
-  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
-}
-
 TEST_F(MoqtMessageSpecificTest, InvalidObjectStatus) {
   MoqtDataParser parser(&visitor_);
-  char object_stream[] = {
-      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x06,  // varints
-      0x66, 0x6f, 0x6f,                          // payload = "foo"
+  char stream_header_subgroup[] = {
+      0x04,                    // type field
+      0x03, 0x04, 0x05, 0x08,  // varints
+      0x07,                    // publisher priority
+      0x06, 0x00, 0x0f,        // object middler; status = 0x0f
   };
-  parser.ProcessData(absl::string_view(object_stream, sizeof(object_stream)),
-                     false);
-  EXPECT_EQ(visitor_.parsing_error_, "Invalid object status");
+  parser.ProcessData(
+      absl::string_view(stream_header_subgroup, sizeof(stream_header_subgroup)),
+      false);
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid object status provided");
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
 
@@ -1232,7 +1198,7 @@
 }
 
 TEST_F(MoqtMessageSpecificTest, WrongMessageInDatagram) {
-  ObjectStreamMessage message;
+  StreamHeaderSubgroupMessage message;
   MoqtObject object;
   absl::string_view payload = ParseDatagram(message.PacketSample(), object);
   EXPECT_TRUE(payload.empty());
diff --git a/quiche/quic/moqt/moqt_priority.cc b/quiche/quic/moqt/moqt_priority.cc
index 73e0171..22a8f7a 100644
--- a/quiche/quic/moqt/moqt_priority.cc
+++ b/quiche/quic/moqt/moqt_priority.cc
@@ -51,17 +51,17 @@
 webtransport::SendOrder SendOrderForStream(MoqtPriority subscriber_priority,
                                            MoqtPriority publisher_priority,
                                            uint64_t group_id,
-                                           uint64_t object_id,
+                                           uint64_t subgroup_id,
                                            MoqtDeliveryOrder delivery_order) {
   const int64_t track_bits = (Flip<8>(subscriber_priority) << 54) |
                              (Flip<8>(publisher_priority) << 46);
   group_id = OnlyLowestNBits<26>(group_id);
-  object_id = OnlyLowestNBits<20>(object_id);
+  subgroup_id = OnlyLowestNBits<20>(subgroup_id);
   if (delivery_order == MoqtDeliveryOrder::kAscending) {
     group_id = Flip<26>(group_id);
   }
-  object_id = Flip<20>(object_id);  // Object ID is always ascending.
-  return track_bits | (group_id << 20) | object_id;
+  subgroup_id = Flip<20>(subgroup_id);
+  return track_bits | (group_id << 20) | subgroup_id;
 }
 
 webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index 0dfd490..15f8a55 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -30,7 +30,7 @@
     uint64_t group_id, MoqtDeliveryOrder delivery_order);
 QUICHE_EXPORT webtransport::SendOrder SendOrderForStream(
     MoqtPriority subscriber_priority, MoqtPriority publisher_priority,
-    uint64_t group_id, uint64_t object_id, MoqtDeliveryOrder delivery_order);
+    uint64_t group_id, uint64_t subgroup_id, MoqtDeliveryOrder delivery_order);
 
 // Returns |send_order| updated with the new |subscriber_priority|.
 QUICHE_EXPORT webtransport::SendOrder UpdateSendOrderForSubscriberPriority(
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 723162c..656b8d0 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -191,10 +191,6 @@
 void MoqtSession::OnDatagramReceived(absl::string_view datagram) {
   MoqtObject message;
   absl::string_view payload = ParseDatagram(datagram, message);
-  if (payload.empty()) {
-    Error(MoqtError::kProtocolViolation, "Malformed datagram");
-    return;
-  }
   QUICHE_DLOG(INFO) << ENDPOINT
                     << "Received OBJECT message in datagram for subscribe_id "
                     << message.subscribe_id << " for track alias "
@@ -892,23 +888,23 @@
 void MoqtSession::IncomingDataStream::OnObjectMessage(const MoqtObject& message,
                                                       absl::string_view payload,
                                                       bool end_of_message) {
-  QUICHE_DVLOG(1)
-      << ENDPOINT << "Received OBJECT message on stream "
-      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
-      << " for track alias " << message.track_alias << " with sequence "
-      << message.group_id << ":" << message.object_id << " priority "
-      << message.publisher_priority << " forwarding_preference "
-      << MoqtForwardingPreferenceToString(message.forwarding_preference)
-      << " length " << payload.size() << " explicit length "
-      << (message.payload_length.has_value() ? (int)*message.payload_length
-                                             : -1)
-      << (end_of_message ? "F" : "");
+  QUICHE_DVLOG(1) << ENDPOINT << "Received OBJECT message on stream "
+                  << stream_->GetStreamId() << " for subscribe_id "
+                  << message.subscribe_id << " for track alias "
+                  << message.track_alias << " with sequence "
+                  << message.group_id << ":" << message.object_id
+                  << " priority " << message.publisher_priority
+                  << " forwarding_preference "
+                  << MoqtForwardingPreferenceToString(
+                         message.forwarding_preference)
+                  << " length " << payload.size() << " length "
+                  << message.payload_length << (end_of_message ? "F" : "");
   if (!session_->parameters_.deliver_partial_objects) {
     if (!end_of_message) {  // Buffer partial object.
-      if (partial_object_.empty() && message.payload_length.has_value()) {
+      if (partial_object_.empty()) {
         // Avoid redundant allocations by reserving the appropriate amount of
         // memory if known.
-        partial_object_.reserve(*message.payload_length);
+        partial_object_.reserve(message.payload_length);
       }
       absl::StrAppend(&partial_object_, payload);
       return;
@@ -1027,14 +1023,6 @@
   if (stream_id.has_value()) {
     raw_stream = session_->session_->GetStreamById(*stream_id);
   } else {
-    if (!window_.IsStreamProvokingObject(sequence, forwarding_preference)) {
-      QUIC_DLOG(INFO) << ENDPOINT << "Received object " << sequence
-                      << ", but there is no stream that it can be mapped to";
-      // It is possible that the we are getting notified of objects out of
-      // order, but we still have to send objects in a manner consistent with
-      // the forwarding preference used.
-      return;
-    }
     raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
   }
   if (raw_stream == nullptr) {
@@ -1089,13 +1077,9 @@
       return SendOrderForStream(subscriber_priority_, publisher_priority,
                                 /*group_id=*/0, delivery_order);
       break;
-    case MoqtForwardingPreference::kGroup:
+    case MoqtForwardingPreference::kSubgroup:
       return SendOrderForStream(subscriber_priority_, publisher_priority,
-                                sequence.group, delivery_order);
-      break;
-    case MoqtForwardingPreference::kObject:
-      return SendOrderForStream(subscriber_priority_, publisher_priority,
-                                sequence.group, sequence.object,
+                                sequence.group, sequence.subgroup,
                                 delivery_order);
       break;
     case MoqtForwardingPreference::kDatagram:
@@ -1270,6 +1254,11 @@
   header.publisher_priority = publisher.GetPublisherPriority();
   header.object_status = object.status;
   header.forwarding_preference = forwarding_preference;
+  // TODO(martinduke): send values other than 0.
+  header.subgroup_id =
+      (forwarding_preference == MoqtForwardingPreference::kSubgroup)
+          ? 0
+          : std::optional<uint64_t>();
   header.payload_length = object.payload.length();
 
   quiche::QuicheBuffer serialized_header =
@@ -1288,19 +1277,15 @@
             !subscription.InWindow(next_object_);
       break;
 
-    case MoqtForwardingPreference::kGroup:
+    case MoqtForwardingPreference::kSubgroup:
       ++next_object_.object;
       fin = object.status == MoqtObjectStatus::kEndOfTrack ||
             object.status == MoqtObjectStatus::kEndOfGroup ||
+            object.status == MoqtObjectStatus::kEndOfSubgroup ||
             object.status == MoqtObjectStatus::kGroupDoesNotExist ||
             !subscription.InWindow(next_object_);
       break;
 
-    case MoqtForwardingPreference::kObject:
-      QUICHE_DCHECK(!stream_header_written_);
-      fin = true;
-      break;
-
     case MoqtForwardingPreference::kDatagram:
       QUICHE_NOTREACHED();
       break;
@@ -1347,6 +1332,8 @@
   header.publisher_priority = track_publisher_->GetPublisherPriority();
   header.object_status = object->status;
   header.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  header.subgroup_id = std::nullopt;
+  header.payload_length = object->payload.length();
   quiche::QuicheBuffer datagram = session_->framer_.SerializeObjectDatagram(
       header, object->payload.AsStringView());
   session_->session_->SendOrQueueDatagram(datagram.AsStringView());
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 994e85a..106c172 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -717,7 +717,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -742,7 +743,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/16,
   };
   webtransport::test::MockStream mock_stream;
@@ -772,7 +774,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/16,
   };
   webtransport::test::MockStream mock_stream;
@@ -809,7 +812,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -867,7 +871,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -928,7 +933,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -948,7 +954,7 @@
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kIncomingUniStreamId));
   object_stream->OnObjectMessage(object, payload, true);
-  object.forwarding_preference = MoqtForwardingPreference::kObject;
+  object.forwarding_preference = MoqtForwardingPreference::kTrack;
   ++object.object_id;
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -980,7 +986,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -1004,7 +1011,7 @@
   MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor, 2);
   // The track already exists, and has a different forwarding preference.
   MoqtSessionPeer::remote_track(&session_, 2)
-      .CheckForwardingPreference(MoqtForwardingPreference::kObject);
+      .CheckForwardingPreference(MoqtForwardingPreference::kTrack);
 
   // SUBSCRIBE_OK arrives
   MoqtSubscribeOk ok = {
@@ -1025,7 +1032,7 @@
 
 TEST_F(MoqtSessionTest, CreateIncomingDataStreamAndSend) {
   FullTrackName ftn("foo", "bar");
-  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kObject,
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
                               FullSequence(4, 2));
   MoqtObjectListener* subscription =
       MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
@@ -1052,7 +1059,7 @@
 
   // Verify first six message fields are sent correctly
   bool correct_message = false;
-  const std::string kExpectedMessage = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+  const std::string kExpectedMessage = {0x04, 0x00, 0x02, 0x05, 0x00, 0x00};
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
@@ -1075,8 +1082,8 @@
 
 TEST_F(MoqtSessionTest, UnidirectionalStreamCannotBeOpened) {
   FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+                              FullSequence(4, 2));
   MoqtObjectListener* subscription =
       MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
@@ -1118,8 +1125,8 @@
 
 TEST_F(MoqtSessionTest, OutgoingStreamDisappears) {
   FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(4, 2));
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+                              FullSequence(4, 2));
   MoqtObjectListener* subscription =
       MoqtSessionPeer::AddSubscription(&session_, track, 0, 2, 5, 0);
 
@@ -1277,7 +1284,7 @@
   // Publish in window.
   bool correct_message = false;
   uint8_t kExpectedMessage[] = {
-      0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x00, 0x64,
+      0x01, 0x00, 0x02, 0x05, 0x00, 0x00, 0x08, 0x64,
       0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66,
   };
   EXPECT_CALL(mock_session_, SendOrQueueDatagram(_))
@@ -1311,9 +1318,10 @@
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kDatagram,
+      /*subgroup_id=*/std::nullopt,
       /*payload_length=*/8,
   };
-  char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x64,
+  char datagram[] = {0x01, 0x01, 0x02, 0x00, 0x00, 0x00, 0x08, 0x64,
                      0x65, 0x61, 0x64, 0x62, 0x65, 0x65, 0x66};
   EXPECT_CALL(visitor_,
               OnObjectFragment(ftn, object.group_id, object.object_id,
@@ -1335,7 +1343,8 @@
       /*object_sequence=*/0,
       /*publisher_priority=*/0,
       /*object_status=*/MoqtObjectStatus::kNormal,
-      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
+      /*forwarding_preference=*/MoqtForwardingPreference::kSubgroup,
+      /*subgroup_id=*/0,
       /*payload_length=*/8,
   };
   webtransport::test::MockStream mock_stream;
@@ -1410,8 +1419,8 @@
 
 TEST_F(MoqtSessionTest, QueuedStreamsOpenedInOrder) {
   FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+                              FullSequence(0, 0));
   EXPECT_CALL(*track, GetTrackStatus())
       .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
   MoqtObjectListener* subscription =
@@ -1482,22 +1491,22 @@
   EXPECT_CALL(mock_stream0, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
-        // The Group ID is the 5th byte of the stream.
+        // The Group ID is the 4th byte of the stream.
         EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
         return absl::OkStatus();
       });
   EXPECT_CALL(mock_stream1, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
-        // The Group ID is the 5th byte of the stream.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 1);
+        // The Group ID is the 4th byte of the stream.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 1);
         return absl::OkStatus();
       });
   EXPECT_CALL(mock_stream2, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
-        // The Group ID is the 5th byte of the stream.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 2);
+        // The Group ID is the 4th byte of the stream.
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 2);
         return absl::OkStatus();
       });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1505,8 +1514,8 @@
 
 TEST_F(MoqtSessionTest, StreamQueuedForSubscriptionThatDoesntExist) {
   FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+                              FullSequence(0, 0));
   EXPECT_CALL(*track, GetTrackStatus())
       .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
   MoqtObjectListener* subscription =
@@ -1527,8 +1536,8 @@
 
 TEST_F(MoqtSessionTest, QueuedStreamPriorityChanged) {
   FullTrackName ftn("foo", "bar");
-  auto track =
-      SetupPublisher(ftn, MoqtForwardingPreference::kGroup, FullSequence(0, 0));
+  auto track = SetupPublisher(ftn, MoqtForwardingPreference::kSubgroup,
+                              FullSequence(0, 0));
   EXPECT_CALL(*track, GetTrackStatus())
       .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
   // Create two identical subscriptions with different priorities.
@@ -1579,9 +1588,9 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         // Check subscribe ID is 0.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 0);
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 0);
         // Check Group ID is 0
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0);
         return absl::OkStatus();
       });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
@@ -1615,9 +1624,9 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         // Check subscribe ID is 0.
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][2]), 1);
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][1]), 1);
         // Check Group ID is 0
-        EXPECT_EQ(static_cast<const uint8_t>(data[0][4]), 0);
+        EXPECT_EQ(static_cast<const uint8_t>(data[0][3]), 0);
         return absl::OkStatus();
       });
   session_.OnCanCreateNewOutgoingUnidirectionalStream();
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
index 0c4a637..4637bd1 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -66,38 +66,18 @@
   return true;
 }
 
-bool SubscribeWindow::IsStreamProvokingObject(
-    FullSequence sequence, MoqtForwardingPreference preference) const {
-  if (sequence == start_) {
-    return true;
-  }
-  switch (preference) {
-    case MoqtForwardingPreference::kTrack:
-      return false;
-    case MoqtForwardingPreference::kGroup:
-      // Note: this assumes that the group starts with object 0.
-      return sequence.object == 0;
-    case MoqtForwardingPreference::kObject:
-      return true;
-    case MoqtForwardingPreference::kDatagram:
-      QUICHE_DCHECK(false);
-      return true;
-  }
-}
-
 ReducedSequenceIndex::ReducedSequenceIndex(
     FullSequence sequence, MoqtForwardingPreference preference) {
   switch (preference) {
     case MoqtForwardingPreference::kTrack:
       sequence_ = FullSequence(0, 0);
       break;
-    case MoqtForwardingPreference::kGroup:
+    case MoqtForwardingPreference::kSubgroup:
       sequence_ = FullSequence(sequence.group, 0);
       break;
-    case MoqtForwardingPreference::kObject:
     case MoqtForwardingPreference::kDatagram:
       sequence_ = sequence;
-      break;
+      return;
   }
 }
 
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 5e0307d..b850f3e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -43,11 +43,6 @@
   // MoQT, subscription windows are only allowed to shrink, not to expand).
   bool UpdateStartEnd(FullSequence start, std::optional<FullSequence> end);
 
-  // Returns true if for a given forwarding preference, the specified sequence
-  // number might be the first object on a stream.
-  bool IsStreamProvokingObject(FullSequence sequence,
-                               MoqtForwardingPreference preference) const;
-
  private:
   FullSequence start_;
   std::optional<FullSequence> end_;
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index 49213ff..ba8687e 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -44,8 +44,8 @@
   EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), std::nullopt);
 }
 
-TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdGroup) {
-  SendStreamMap stream_map(MoqtForwardingPreference::kGroup);
+TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdSubgroup) {
+  SendStreamMap stream_map(MoqtForwardingPreference::kSubgroup);
   stream_map.AddStream(FullSequence{4, 0}, 2);
   EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
   stream_map.AddStream(FullSequence{5, 2}, 6);
@@ -57,21 +57,6 @@
   EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 2)), std::nullopt);
 }
 
-TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdObject) {
-  SendStreamMap stream_map(MoqtForwardingPreference::kObject);
-  stream_map.AddStream(FullSequence{4, 0}, 2);
-  stream_map.AddStream(FullSequence{4, 1}, 6);
-  stream_map.AddStream(FullSequence{4, 2}, 10);
-  EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 2}, 14),
-                  "Stream already added");
-  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 0)), 2);
-  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), 10);
-  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 4)), std::nullopt);
-  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(5, 0)), std::nullopt);
-  stream_map.RemoveStream(FullSequence(4, 2), 10);
-  EXPECT_EQ(stream_map.GetStreamForSequence(FullSequence(4, 2)), std::nullopt);
-}
-
 TEST_F(SubscribeWindowTest, AddQueryRemoveStreamIdDatagram) {
   SendStreamMap stream_map(MoqtForwardingPreference::kDatagram);
   EXPECT_QUIC_BUG(stream_map.AddStream(FullSequence{4, 0}, 2),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 4b0ff92..cd62dd8 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -27,9 +27,9 @@
 
 TEST_F(RemoteTrackTest, UpdateForwardingPreference) {
   EXPECT_TRUE(
-      track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+      track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup));
   EXPECT_TRUE(
-      track_.CheckForwardingPreference(MoqtForwardingPreference::kObject));
+      track_.CheckForwardingPreference(MoqtForwardingPreference::kSubgroup));
   EXPECT_FALSE(
       track_.CheckForwardingPreference(MoqtForwardingPreference::kDatagram));
 }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 7e4fafe..b274194 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -139,6 +139,10 @@
       QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
       return false;
     }
+    if (cast.subgroup_id != object_.subgroup_id) {
+      QUIC_LOG(INFO) << "OBJECT Subgroup ID mismatch";
+      return false;
+    }
     if (cast.payload_length != object_.payload_length) {
       QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
       return false;
@@ -159,23 +163,8 @@
       /*publisher_priority=*/7,
       /*object_status=*/MoqtObjectStatus::kNormal,
       /*forwarding_preference=*/MoqtForwardingPreference::kTrack,
-      /*payload_length=*/std::nullopt,
-  };
-};
-
-class QUICHE_NO_EXPORT ObjectStreamMessage : public ObjectMessage {
- public:
-  ObjectStreamMessage() : ObjectMessage() {
-    SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.forwarding_preference = MoqtForwardingPreference::kObject;
-  }
-
-  void ExpandVarints() override { ExpandVarintsImpl("vvvvv-v---"); }
-
- private:
-  uint8_t raw_packet_[10] = {
-      0x00, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00,  // varints
-      0x66, 0x6f, 0x6f,                          // payload = "foo"
+      /*subgroup_id=*/std::nullopt,
+      /*payload_length=*/3,
   };
 };
 
@@ -190,8 +179,9 @@
 
  private:
   uint8_t raw_packet_[10] = {
-      0x01, 0x03, 0x04, 0x05, 0x06, 0x07, 0x00,  // varints
-      0x66, 0x6f, 0x6f,                          // payload = "foo"
+      0x01, 0x03, 0x04, 0x05, 0x06,  // varints
+      0x07,                          // publisher priority
+      0x03, 0x66, 0x6f, 0x6f,        // payload = "foo"
   };
 };
 
@@ -205,16 +195,17 @@
     object_.payload_length = 3;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("--vv-vvv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvv-vvv"); }
 
  private:
   // Some tests check that a FIN sent at the halfway point of a message results
   // in an error. Without the unnecessary expanded varint 0x0405, the halfway
   // point falls at the end of the Stream Header, which is legal. Expand the
   // varint so that the FIN would be illegal.
-  uint8_t raw_packet_[11] = {
-      0x40, 0x50,              // two byte type field
-      0x03, 0x04, 0x07,        // varints
+  uint8_t raw_packet_[10] = {
+      0x02,                    // type field
+      0x03, 0x04,              // varints
+      0x07,                    // publisher priority
       0x05, 0x06,              // object middler
       0x03, 0x66, 0x6f, 0x6f,  // payload = "foo"
   };
@@ -226,7 +217,6 @@
   StreamMiddlerTrackMessage() : ObjectMessage() {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
     object_.forwarding_preference = MoqtForwardingPreference::kTrack;
-    object_.payload_length = 3;
     object_.group_id = 9;
     object_.object_id = 10;
   }
@@ -240,35 +230,47 @@
   };
 };
 
-class QUICHE_NO_EXPORT StreamHeaderGroupMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT StreamHeaderSubgroupMessage : public ObjectMessage {
  public:
-  StreamHeaderGroupMessage() : ObjectMessage() {
+  StreamHeaderSubgroupMessage() : ObjectMessage() {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
-    object_.payload_length = 3;
+    object_.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+    object_.subgroup_id = 8;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("--vvv-vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvvv-vv"); }
+
+  bool SetPayloadLength(uint8_t payload_length) {
+    if (payload_length > 63) {
+      // This only supports one-byte varints.
+      return false;
+    }
+    object_.payload_length = payload_length;
+    raw_packet_[7] = payload_length;
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    return true;
+  }
 
  private:
   uint8_t raw_packet_[11] = {
-      0x40, 0x51,                    // two-byte type field
-      0x03, 0x04, 0x05, 0x07,        // varints
+      0x04,                          // type field
+      0x03, 0x04, 0x05, 0x08,        // varints
+      0x07,                          // publisher priority
       0x06, 0x03, 0x66, 0x6f, 0x6f,  // object middler; payload = "foo"
   };
 };
 
 // Used only for tests that process multiple objects on one stream.
-class QUICHE_NO_EXPORT StreamMiddlerGroupMessage : public ObjectMessage {
+class QUICHE_NO_EXPORT StreamMiddlerSubgroupMessage : public ObjectMessage {
  public:
-  StreamMiddlerGroupMessage() : ObjectMessage() {
+  StreamMiddlerSubgroupMessage() : ObjectMessage() {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
-    object_.payload_length = 3;
+    object_.forwarding_preference = MoqtForwardingPreference::kSubgroup;
+    object_.subgroup_id = 8;
     object_.object_id = 9;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vv"); }
 
  private:
   uint8_t raw_packet_[5] = {
@@ -1187,14 +1189,12 @@
 static inline std::unique_ptr<TestMessageBase> CreateTestDataStream(
     MoqtDataStreamType type) {
   switch (type) {
-    case MoqtDataStreamType::kObjectStream:
-      return std::make_unique<ObjectStreamMessage>();
     case MoqtDataStreamType::kObjectDatagram:
       return std::make_unique<ObjectDatagramMessage>();
     case MoqtDataStreamType::kStreamHeaderTrack:
       return std::make_unique<StreamHeaderTrackMessage>();
-    case MoqtDataStreamType::kStreamHeaderGroup:
-      return std::make_unique<StreamHeaderGroupMessage>();
+    case MoqtDataStreamType::kStreamHeaderSubgroup:
+      return std::make_unique<StreamHeaderSubgroupMessage>();
     case MoqtDataStreamType::kPadding:
       return nullptr;
   }
diff --git a/quiche/quic/moqt/tools/chat_client.cc b/quiche/quic/moqt/tools/chat_client.cc
index b706469..68208b1 100644
--- a/quiche/quic/moqt/tools/chat_client.cc
+++ b/quiche/quic/moqt/tools/chat_client.cc
@@ -165,7 +165,7 @@
     FullTrackName my_track_name =
         chat_strings_->GetFullTrackNameFromUsername(username_);
     queue_ = std::make_shared<MoqtOutgoingQueue>(
-        my_track_name, MoqtForwardingPreference::kObject);
+        my_track_name, MoqtForwardingPreference::kSubgroup);
     publisher_.Add(queue_);
     session_->set_publisher(&publisher_);
     MoqtOutgoingAnnounceCallback announce_callback =
diff --git a/quiche/quic/moqt/tools/chat_server.cc b/quiche/quic/moqt/tools/chat_server.cc
index 29d0842..028786e 100644
--- a/quiche/quic/moqt/tools/chat_server.cc
+++ b/quiche/quic/moqt/tools/chat_server.cc
@@ -60,6 +60,9 @@
 }
 
 ChatServer::ChatServerSessionHandler::~ChatServerSessionHandler() {
+  if (!server_->is_running_) {
+    return;
+  }
   if (username_.has_value()) {
     server_->DeleteUser(*username_);
   }
@@ -125,7 +128,7 @@
     : server_(std::move(proof_source), std::move(incoming_session_callback_)),
       strings_(chat_id),
       catalog_(std::make_shared<MoqtOutgoingQueue>(
-          strings_.GetCatalogName(), MoqtForwardingPreference::kGroup)),
+          strings_.GetCatalogName(), MoqtForwardingPreference::kSubgroup)),
       remote_track_visitor_(this) {
   catalog_->AddObject(quiche::QuicheMemSlice(quiche::QuicheBuffer::Copy(
                           quiche::SimpleBufferAllocator::Get(),
@@ -145,6 +148,7 @@
 ChatServer::~ChatServer() {
   // Kill all sessions so that the callback doesn't fire when the server is
   // destroyed.
+  is_running_ = false;
   server_.quic_server().Shutdown();
 }
 
@@ -156,7 +160,7 @@
   // Add a local track.
   user_queues_[username] = std::make_shared<MoqtLiveRelayQueue>(
       strings_.GetFullTrackNameFromUsername(username),
-      MoqtForwardingPreference::kObject);
+      MoqtForwardingPreference::kSubgroup);
   publisher_.Add(user_queues_[username]);
 }
 
diff --git a/quiche/quic/moqt/tools/chat_server.h b/quiche/quic/moqt/tools/chat_server.h
index a399af9..7bb4f13 100644
--- a/quiche/quic/moqt/tools/chat_server.h
+++ b/quiche/quic/moqt/tools/chat_server.h
@@ -103,6 +103,7 @@
   MoqtIncomingSessionCallback incoming_session_callback_ =
       [&](absl::string_view path) { return IncomingSessionHandler(path); };
 
+  bool is_running_ = true;
   MoqtServer server_;
   std::list<ChatServerSessionHandler> sessions_;
   MoqChatStrings strings_;
diff --git a/quiche/quic/moqt/tools/moqt_simulator_bin.cc b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
index e84a37d..d111d16 100644
--- a/quiche/quic/moqt/tools/moqt_simulator_bin.cc
+++ b/quiche/quic/moqt/tools/moqt_simulator_bin.cc
@@ -118,7 +118,7 @@
                   QuicBandwidth bitrate)
       : Actor(simulator, actor_name),
         queue_(std::make_shared<MoqtOutgoingQueue>(
-            track_name, MoqtForwardingPreference::kGroup)),
+            track_name, MoqtForwardingPreference::kSubgroup)),
         keyframe_interval_(keyframe_interval),
         time_between_frames_(QuicTimeDelta::FromMicroseconds(1.0e6 / fps)),
         i_to_p_ratio_(i_to_p_ratio),