diff --git a/build/source_list.bzl b/build/source_list.bzl
index 9431ce4..88ba579 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1497,6 +1497,7 @@
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_session.cc",
     "quic/moqt/moqt_session_test.cc",
+    "quic/moqt/moqt_subscribe_windows.cc",
     "quic/moqt/moqt_subscribe_windows_test.cc",
     "quic/moqt/moqt_track_test.cc",
     "quic/moqt/tools/chat_client_bin.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 9e3a202..68a4748 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1501,6 +1501,7 @@
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
     "src/quiche/quic/moqt/moqt_session_test.cc",
+    "src/quiche/quic/moqt/moqt_subscribe_windows.cc",
     "src/quiche/quic/moqt/moqt_subscribe_windows_test.cc",
     "src/quiche/quic/moqt/moqt_track_test.cc",
     "src/quiche/quic/moqt/tools/chat_client_bin.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 2971cf8..29f189f 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1500,6 +1500,7 @@
     "quiche/quic/moqt/moqt_parser_test.cc",
     "quiche/quic/moqt/moqt_session.cc",
     "quiche/quic/moqt/moqt_session_test.cc",
+    "quiche/quic/moqt/moqt_subscribe_windows.cc",
     "quiche/quic/moqt/moqt_subscribe_windows_test.cc",
     "quiche/quic/moqt/moqt_track_test.cc",
     "quiche/quic/moqt/tools/chat_client_bin.cc",
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 8666061..2c46476 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -11,8 +11,8 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_data_writer.h"
 #include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 
 namespace moqt {
@@ -109,33 +109,67 @@
 }  // namespace
 
 quiche::QuicheBuffer MoqtFramer::SerializeObject(
-    const MoqtObject& message, const absl::string_view payload) {
+    const MoqtObject& message, const absl::string_view payload,
+    bool is_first_in_stream) {
   if (message.payload_length.has_value() &&
       *message.payload_length < payload.length()) {
-    QUICHE_DLOG(INFO) << "payload_size is too small for payload";
+    QUIC_BUG(quic_bug_serialize_object_input_01)
+        << "payload_size is too small for payload";
     return quiche::QuicheBuffer();
   }
-  uint64_t message_type =
-      static_cast<uint64_t>(message.payload_length.has_value()
-                                ? MoqtMessageType::kObjectWithPayloadLength
-                                : MoqtMessageType::kObjectWithoutPayloadLength);
-  size_t buffer_size =
-      NeededVarIntLen(message_type) + NeededVarIntLen(message.track_id) +
-      NeededVarIntLen(message.group_sequence) +
-      NeededVarIntLen(message.object_sequence) +
-      NeededVarIntLen(message.object_send_order) + payload.length();
-  if (message.payload_length.has_value()) {
-    buffer_size += NeededVarIntLen(*message.payload_length);
+  if (!is_first_in_stream &&
+      (message.forwarding_preference == MoqtForwardingPreference::kObject ||
+       message.forwarding_preference == MoqtForwardingPreference::kDatagram)) {
+    QUIC_BUG(quic_bug_serialize_object_input_02)
+        << "Object or Datagram forwarding_preference must be first in stream";
+    return quiche::QuicheBuffer();
   }
+  // Figure out the total message size based on message type and payload.
+  size_t buffer_size = NeededVarIntLen(message.object_id) + payload.length();
+  uint64_t message_type = static_cast<uint64_t>(
+      GetMessageTypeForForwardingPreference(message.forwarding_preference));
+  if (is_first_in_stream) {
+    buffer_size += NeededVarIntLen(message_type) +
+                   NeededVarIntLen(message.subscribe_id) +
+                   NeededVarIntLen(message.track_alias) +
+                   NeededVarIntLen(message.group_id) +
+                   NeededVarIntLen(message.object_send_order);
+  } else if (message.forwarding_preference ==
+             MoqtForwardingPreference::kTrack) {
+    buffer_size += NeededVarIntLen(message.group_id);
+  }
+  uint64_t reported_payload_length = message.payload_length.has_value()
+                                         ? message.payload_length.value()
+                                         : payload.length();
+  if (message.forwarding_preference == MoqtForwardingPreference::kTrack ||
+      message.forwarding_preference == MoqtForwardingPreference::kGroup) {
+    buffer_size += NeededVarIntLen(reported_payload_length);
+  }
+  // Write to buffer.
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
-  writer.WriteVarInt62(message_type);
-  writer.WriteVarInt62(message.track_id);
-  writer.WriteVarInt62(message.group_sequence);
-  writer.WriteVarInt62(message.object_sequence);
-  writer.WriteVarInt62(message.object_send_order);
-  if (message.payload_length.has_value()) {
-    writer.WriteVarInt62(*message.payload_length);
+  if (is_first_in_stream) {
+    writer.WriteVarInt62(message_type);
+    writer.WriteVarInt62(message.subscribe_id);
+    writer.WriteVarInt62(message.track_alias);
+    if (message.forwarding_preference != MoqtForwardingPreference::kTrack) {
+      writer.WriteVarInt62(message.group_id);
+      if (message.forwarding_preference != MoqtForwardingPreference::kGroup) {
+        writer.WriteVarInt62(message.object_id);
+      }
+    }
+    writer.WriteVarInt62(message.object_send_order);
+  }
+  switch (message.forwarding_preference) {
+    case MoqtForwardingPreference::kTrack:
+      writer.WriteVarInt62(message.group_id);
+      [[fallthrough]];
+    case MoqtForwardingPreference::kGroup:
+      writer.WriteVarInt62(message.object_id);
+      writer.WriteVarInt62(reported_payload_length);
+      break;
+    default:
+      break;
   }
   writer.WriteStringPiece(payload);
   return buffer;
@@ -216,8 +250,8 @@
   return buffer;
 }
 
-quiche::QuicheBuffer MoqtFramer::SerializeSubscribeRequest(
-    const MoqtSubscribeRequest& message) {
+quiche::QuicheBuffer MoqtFramer::SerializeSubscribe(
+    const MoqtSubscribe& message) {
   if (!message.start_group.has_value() || !message.start_object.has_value()) {
     QUIC_LOG(INFO) << "start_group or start_object is missing";
     return quiche::QuicheBuffer();
@@ -227,7 +261,9 @@
                    << "non-None";
     return quiche::QuicheBuffer();
   }
-  size_t buffer_size = NeededVarIntLen(MoqtMessageType::kSubscribeRequest) +
+  size_t buffer_size = NeededVarIntLen(MoqtMessageType::kSubscribe) +
+                       NeededVarIntLen(message.subscribe_id) +
+                       NeededVarIntLen(message.track_alias) +
                        LengthPrefixedStringLength(message.track_namespace) +
                        LengthPrefixedStringLength(message.track_name) +
                        LocationLength(message.start_group) +
@@ -244,8 +280,9 @@
   buffer_size += NeededVarIntLen(num_params);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
-  writer.WriteVarInt62(
-      static_cast<uint64_t>(MoqtMessageType::kSubscribeRequest));
+  writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribe));
+  writer.WriteVarInt62(static_cast<uint64_t>(message.subscribe_id));
+  writer.WriteVarInt62(static_cast<uint64_t>(message.track_alias));
   writer.WriteStringPieceVarInt62(message.track_namespace);
   writer.WriteStringPieceVarInt62(message.track_name);
   WriteLocation(writer, message.start_group);
@@ -267,16 +304,12 @@
     const MoqtSubscribeOk& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeOk)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
-      NeededVarIntLen(message.track_id) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.expires.ToMilliseconds());
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeOk));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
-  writer.WriteVarInt62(message.track_id);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.expires.ToMilliseconds());
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
@@ -286,17 +319,17 @@
     const MoqtSubscribeError& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeError)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(static_cast<uint64_t>(message.error_code)) +
-      LengthPrefixedStringLength(message.reason_phrase);
+      LengthPrefixedStringLength(message.reason_phrase) +
+      NeededVarIntLen(message.track_alias);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeError));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(static_cast<uint64_t>(message.error_code));
   writer.WriteStringPieceVarInt62(message.reason_phrase);
+  writer.WriteVarInt62(message.track_alias);
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
 }
@@ -305,13 +338,11 @@
     const MoqtUnsubscribe& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kUnsubscribe)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name);
+      NeededVarIntLen(message.subscribe_id);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kUnsubscribe));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
 }
@@ -320,15 +351,13 @@
     const MoqtSubscribeFin& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeFin)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.final_group) +
       NeededVarIntLen(message.final_object);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeFin));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.final_group);
   writer.WriteVarInt62(message.final_object);
   QUICHE_DCHECK(writer.remaining() == 0);
@@ -339,8 +368,7 @@
     const MoqtSubscribeRst& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeRst)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.error_code) +
       LengthPrefixedStringLength(message.reason_phrase) +
       NeededVarIntLen(message.final_group) +
@@ -348,8 +376,7 @@
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeRst));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.error_code);
   writer.WriteStringPieceVarInt62(message.reason_phrase);
   writer.WriteVarInt62(message.final_group);
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index a5f741b..080e76e 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -34,17 +34,21 @@
 
   // SerializeObject also takes a payload. |payload_size| might simply be the
   // size of |payload|, or it could be larger if there is more data coming, or
-  // it could be nullopt if the final length is unknown. If |payload_size| is
-  // smaller than |payload|, returns an empty buffer.
+  // it could be nullopt if the final length is unknown.
+  // If |message.payload_size| is nullopt but the forwarding preference requires
+  // a payload length, will assume that payload.length() is the correct value.
+  // If |message.payload_size| is smaller than |payload|, or
+  // |message.forwarding preference| is not consistent with
+  // |is_first_in_stream|, returns an empty buffer and triggers QUIC_BUG.
   quiche::QuicheBuffer SerializeObject(const MoqtObject& message,
-                                       absl::string_view payload);
+                                       absl::string_view payload,
+                                       bool is_first_in_stream);
   // Build a buffer for additional payload data.
   quiche::QuicheBuffer SerializeObjectPayload(absl::string_view payload);
   quiche::QuicheBuffer SerializeClientSetup(const MoqtClientSetup& message);
   quiche::QuicheBuffer SerializeServerSetup(const MoqtServerSetup& message);
   // Returns an empty buffer if there is an illegal combination of locations.
-  quiche::QuicheBuffer SerializeSubscribeRequest(
-      const MoqtSubscribeRequest& message);
+  quiche::QuicheBuffer SerializeSubscribe(const MoqtSubscribe& message);
   quiche::QuicheBuffer SerializeSubscribeOk(const MoqtSubscribeOk& message);
   quiche::QuicheBuffer SerializeSubscribeError(
       const MoqtSubscribeError& message);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index f261e1f..ff28d92 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -8,9 +8,9 @@
 #include <string>
 #include <vector>
 
-#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
+#include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/simple_buffer_allocator.h"
@@ -27,11 +27,9 @@
 std::vector<MoqtFramerTestParams> GetMoqtFramerTestParams() {
   std::vector<MoqtFramerTestParams> params;
   std::vector<MoqtMessageType> message_types = {
-      MoqtMessageType::kObjectWithPayloadLength,
-      MoqtMessageType::kObjectWithoutPayloadLength,
-      MoqtMessageType::kClientSetup,
-      MoqtMessageType::kServerSetup,
-      MoqtMessageType::kSubscribeRequest,
+      MoqtMessageType::kObjectStream,
+      MoqtMessageType::kObjectPreferDatagram,
+      MoqtMessageType::kSubscribe,
       MoqtMessageType::kSubscribeOk,
       MoqtMessageType::kSubscribeError,
       MoqtMessageType::kUnsubscribe,
@@ -42,6 +40,10 @@
       MoqtMessageType::kAnnounceError,
       MoqtMessageType::kUnannounce,
       MoqtMessageType::kGoAway,
+      MoqtMessageType::kClientSetup,
+      MoqtMessageType::kServerSetup,
+      MoqtMessageType::kStreamHeaderTrack,
+      MoqtMessageType::kStreamHeaderGroup,
   };
   std::vector<bool> uses_web_transport_bool = {
       false,
@@ -78,61 +80,22 @@
         framer_(buffer_allocator_, GetParam().uses_web_transport) {}
 
   std::unique_ptr<TestMessageBase> MakeMessage(MoqtMessageType message_type) {
-    switch (message_type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        return std::make_unique<ObjectMessageWithLength>();
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        return std::make_unique<ObjectMessageWithoutLength>();
-      case MoqtMessageType::kClientSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kServerSetup:
-        return std::make_unique<ServerSetupMessage>();
-      case MoqtMessageType::kSubscribeRequest:
-        return std::make_unique<SubscribeRequestMessage>();
-      case MoqtMessageType::kSubscribeOk:
-        return std::make_unique<SubscribeOkMessage>();
-      case MoqtMessageType::kSubscribeError:
-        return std::make_unique<SubscribeErrorMessage>();
-      case MoqtMessageType::kUnsubscribe:
-        return std::make_unique<UnsubscribeMessage>();
-      case MoqtMessageType::kSubscribeFin:
-        return std::make_unique<SubscribeFinMessage>();
-      case MoqtMessageType::kSubscribeRst:
-        return std::make_unique<SubscribeRstMessage>();
-      case MoqtMessageType::kAnnounce:
-        return std::make_unique<AnnounceMessage>();
-      case moqt::MoqtMessageType::kAnnounceOk:
-        return std::make_unique<AnnounceOkMessage>();
-      case moqt::MoqtMessageType::kAnnounceError:
-        return std::make_unique<AnnounceErrorMessage>();
-      case moqt::MoqtMessageType::kUnannounce:
-        return std::make_unique<UnannounceMessage>();
-      case moqt::MoqtMessageType::kGoAway:
-        return std::make_unique<GoAwayMessage>();
-      default:
-        return nullptr;
-    }
+    return CreateTestMessage(message_type, webtrans_);
   }
 
   quiche::QuicheBuffer SerializeMessage(
       TestMessageBase::MessageStructuredData& structured_data) {
     switch (message_type_) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-      case MoqtMessageType::kObjectWithoutPayloadLength: {
+      case MoqtMessageType::kObjectStream:
+      case MoqtMessageType::kObjectPreferDatagram:
+      case MoqtMessageType::kStreamHeaderTrack:
+      case MoqtMessageType::kStreamHeaderGroup: {
         auto data = std::get<MoqtObject>(structured_data);
-        return framer_.SerializeObject(data, "foo");
+        return framer_.SerializeObject(data, "foo", true);
       }
-      case MoqtMessageType::kClientSetup: {
-        auto data = std::get<MoqtClientSetup>(structured_data);
-        return framer_.SerializeClientSetup(data);
-      }
-      case MoqtMessageType::kServerSetup: {
-        auto data = std::get<MoqtServerSetup>(structured_data);
-        return framer_.SerializeServerSetup(data);
-      }
-      case MoqtMessageType::kSubscribeRequest: {
-        auto data = std::get<MoqtSubscribeRequest>(structured_data);
-        return framer_.SerializeSubscribeRequest(data);
+      case MoqtMessageType::kSubscribe: {
+        auto data = std::get<MoqtSubscribe>(structured_data);
+        return framer_.SerializeSubscribe(data);
       }
       case MoqtMessageType::kSubscribeOk: {
         auto data = std::get<MoqtSubscribeOk>(structured_data);
@@ -174,6 +137,14 @@
         auto data = std::get<MoqtGoAway>(structured_data);
         return framer_.SerializeGoAway(data);
       }
+      case MoqtMessageType::kClientSetup: {
+        auto data = std::get<MoqtClientSetup>(structured_data);
+        return framer_.SerializeClientSetup(data);
+      }
+      case MoqtMessageType::kServerSetup: {
+        auto data = std::get<MoqtServerSetup>(structured_data);
+        return framer_.SerializeServerSetup(data);
+      }
     }
   }
 
@@ -195,4 +166,68 @@
   EXPECT_EQ(buffer.AsStringView(), message->PacketSample());
 }
 
+class MoqtFramerSimpleTest : public quic::test::QuicTest {
+ public:
+  MoqtFramerSimpleTest()
+      : buffer_allocator_(quiche::SimpleBufferAllocator::Get()),
+        framer_(buffer_allocator_, /*web_transport=*/true) {}
+
+  quiche::SimpleBufferAllocator* buffer_allocator_;
+  MoqtFramer framer_;
+};
+
+TEST_F(MoqtFramerSimpleTest, GroupMiddler) {
+  auto header = std::make_unique<StreamHeaderGroupMessage>();
+  auto buffer1 = framer_.SerializeObject(
+      std::get<MoqtObject>(header->structured_data()), "foo", true);
+  EXPECT_EQ(buffer1.size(), header->total_message_size());
+  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+
+  auto middler = std::make_unique<StreamMiddlerGroupMessage>();
+  auto buffer2 = framer_.SerializeObject(
+      std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  EXPECT_EQ(buffer2.size(), middler->total_message_size());
+  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+}
+
+TEST_F(MoqtFramerSimpleTest, TrackMiddler) {
+  auto header = std::make_unique<StreamHeaderTrackMessage>();
+  auto buffer1 = framer_.SerializeObject(
+      std::get<MoqtObject>(header->structured_data()), "foo", true);
+  EXPECT_EQ(buffer1.size(), header->total_message_size());
+  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+
+  auto middler = std::make_unique<StreamMiddlerTrackMessage>();
+  auto buffer2 = framer_.SerializeObject(
+      std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  EXPECT_EQ(buffer2.size(), middler->total_message_size());
+  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+}
+
+TEST_F(MoqtFramerSimpleTest, BadObjectInput) {
+  MoqtObject object = {
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id=*/5,
+      /*object_id=*/6,
+      /*object_send_order=*/7,
+      /*forwarding_preference=*/MoqtForwardingPreference::kObject,
+      /*payload_length=*/1,
+  };
+  quiche::QuicheBuffer buffer;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", true),
+                  "payload_size is too small for payload");
+  EXPECT_TRUE(buffer.empty());
+  object.payload_length = 3;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", false),
+                  "Object or Datagram forwarding_preference must be first "
+                  "in stream");
+  EXPECT_TRUE(buffer.empty());
+  object.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", false),
+                  "Object or Datagram forwarding_preference must be first "
+                  "in stream");
+  EXPECT_TRUE(buffer.empty());
+}
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index c1bc980..a690d73 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -117,9 +117,9 @@
  public:
   void CreateDefaultEndpoints() {
     client_ = std::make_unique<ClientEndpoint>(
-        &test_harness_.simulator(), "Client", "Server", MoqtVersion::kDraft01);
+        &test_harness_.simulator(), "Client", "Server", MoqtVersion::kDraft02);
     server_ = std::make_unique<ServerEndpoint>(
-        &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft01);
+        &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft02);
     test_harness_.set_client(client_.get());
     test_harness_.set_server(server_.get());
   }
@@ -170,7 +170,7 @@
       &test_harness_.simulator(), "Client", "Server",
       MoqtVersion::kUnrecognizedVersionForTests);
   server_ = std::make_unique<ServerEndpoint>(
-      &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft01);
+      &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft02);
   test_harness_.set_client(client_.get());
   test_harness_.set_server(server_.get());
   WireUpEndpoints();
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 9c9021c..0a27832 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -6,19 +6,21 @@
 
 #include <string>
 
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+
 namespace moqt {
 
 std::string MoqtMessageTypeToString(const MoqtMessageType message_type) {
   switch (message_type) {
-    case MoqtMessageType::kObjectWithPayloadLength:
-      return "OBJECT_WITH_LENGTH";
-    case MoqtMessageType::kObjectWithoutPayloadLength:
-      return "OBJECT_WITHOUT_LENGTH";
+    case MoqtMessageType::kObjectStream:
+      return "OBJECT_STREAM";
+    case MoqtMessageType::kObjectPreferDatagram:
+      return "OBJECT_PREFER_DATAGRAM";
     case MoqtMessageType::kClientSetup:
       return "CLIENT_SETUP";
     case MoqtMessageType::kServerSetup:
       return "SERVER_SETUP";
-    case MoqtMessageType::kSubscribeRequest:
+    case MoqtMessageType::kSubscribe:
       return "SUBSCRIBE_REQUEST";
     case MoqtMessageType::kSubscribeOk:
       return "SUBSCRIBE_OK";
@@ -40,7 +42,64 @@
       return "UNANNOUNCE";
     case MoqtMessageType::kGoAway:
       return "GOAWAY";
+    case MoqtMessageType::kStreamHeaderTrack:
+      return "STREAM_HEADER_TRACK";
+    case MoqtMessageType::kStreamHeaderGroup:
+      return "STREAM_HEADER_GROUP";
   }
+  return "Unknown message " + std::to_string(static_cast<int>(message_type));
+}
+
+std::string MoqtForwardingPreferenceToString(
+    MoqtForwardingPreference preference) {
+  switch (preference) {
+    case MoqtForwardingPreference::kObject:
+      return "OBJECT";
+    case MoqtForwardingPreference::kDatagram:
+      return "DATAGRAM";
+    case MoqtForwardingPreference::kTrack:
+      return "TRACK";
+    case MoqtForwardingPreference::kGroup:
+      return "GROUP";
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_01)
+      << "Unknown preference " << std::to_string(static_cast<int>(preference));
+  return "Unknown preference " + std::to_string(static_cast<int>(preference));
+}
+
+MoqtForwardingPreference GetForwardingPreference(MoqtMessageType type) {
+  switch (type) {
+    case MoqtMessageType::kObjectStream:
+      return MoqtForwardingPreference::kObject;
+    case MoqtMessageType::kObjectPreferDatagram:
+      return MoqtForwardingPreference::kDatagram;
+    case MoqtMessageType::kStreamHeaderTrack:
+      return MoqtForwardingPreference::kTrack;
+    case MoqtMessageType::kStreamHeaderGroup:
+      return MoqtForwardingPreference::kGroup;
+    default:
+      break;
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_02)
+      << "Message type does not indicate forwarding preference";
+  return MoqtForwardingPreference::kObject;
+};
+
+MoqtMessageType GetMessageTypeForForwardingPreference(
+    MoqtForwardingPreference preference) {
+  switch (preference) {
+    case MoqtForwardingPreference::kObject:
+      return MoqtMessageType::kObjectStream;
+    case MoqtForwardingPreference::kDatagram:
+      return MoqtMessageType::kObjectPreferDatagram;
+    case MoqtForwardingPreference::kTrack:
+      return MoqtMessageType::kStreamHeaderTrack;
+    case MoqtForwardingPreference::kGroup:
+      return MoqtMessageType::kStreamHeaderGroup;
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_03)
+      << "Forwarding preference does not indicate message type";
+  return MoqtMessageType::kObjectStream;
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 8fd0c55..a7c9e5d 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-// Structured data for message types in draft-ietf-moq-transport-01.
+// Structured data for message types in draft-ietf-moq-transport-02.
 
 #ifndef QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
 #define QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
@@ -20,6 +20,12 @@
 #include "quiche/quic/core/quic_versions.h"
 #include "quiche/common/platform/api/quiche_export.h"
 
+// the draft-02 spec makes AUTH_INFO unparseable in SUBSCRIBE messages. This
+// flag assumes that the num_parameters field exists so that it is parseable.
+// If false, there is no num_parameters field and there must not be an
+// AUTH_INFO field.
+#define MOQT_AUTH_INFO
+
 namespace moqt {
 
 inline constexpr quic::ParsedQuicVersionVector GetMoqtSupportedQuicVersions() {
@@ -27,7 +33,7 @@
 }
 
 enum class MoqtVersion : uint64_t {
-  kDraft01 = 0xff000001,
+  kDraft02 = 0xff000002,
   kUnrecognizedVersionForTests = 0xfe0000ff,
 };
 
@@ -46,9 +52,9 @@
 inline constexpr size_t kMaxMessageHeaderSize = 2048;
 
 enum class QUICHE_EXPORT MoqtMessageType : uint64_t {
-  kObjectWithPayloadLength = 0x00,
-  kObjectWithoutPayloadLength = 0x02,
-  kSubscribeRequest = 0x03,
+  kObjectStream = 0x00,
+  kObjectPreferDatagram = 0x01,
+  kSubscribe = 0x03,
   kSubscribeOk = 0x04,
   kSubscribeError = 0x05,
   kAnnounce = 0x06,
@@ -61,6 +67,8 @@
   kGoAway = 0x10,
   kClientSetup = 0x40,
   kServerSetup = 0x41,
+  kStreamHeaderTrack = 0x50,
+  kStreamHeaderGroup = 0x51,
 };
 
 enum class QUICHE_EXPORT MoqtError : uint64_t {
@@ -85,17 +93,14 @@
 };
 
 enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t {
-  // These two should have been deleted in draft-01.
-  // kGroupSequence = 0x0,
-  // kObjectSequence = 0x1,
   kAuthorizationInfo = 0x2,
 };
 
 struct FullTrackName {
   std::string track_namespace;
   std::string track_name;
-  FullTrackName(std::string ns, std::string name)
-      : track_namespace(std::move(ns)), track_name(std::move(name)) {}
+  FullTrackName(absl::string_view ns, absl::string_view name)
+      : track_namespace(ns), track_name(name) {}
   bool operator==(const FullTrackName& other) const {
     return track_namespace == other.track_namespace &&
            track_name == other.track_name;
@@ -125,6 +130,17 @@
     return group < other.group ||
            (group == other.group && object < other.object);
   }
+  bool operator<=(const FullSequence& other) const {
+    return (group < other.group ||
+            (group == other.group && object <= other.object));
+  }
+  FullSequence& operator=(FullSequence other) {
+    group = other.group;
+    object = other.object;
+    return *this;
+  }
+  template <typename H>
+  friend H AbslHashValue(H h, const FullSequence& m);
 };
 
 template <typename H>
@@ -143,13 +159,25 @@
   std::optional<MoqtRole> role;
 };
 
+// These codes do not appear on the wire.
+enum class QUICHE_EXPORT MoqtForwardingPreference : uint8_t {
+  kTrack = 0x0,
+  kGroup = 0x1,
+  kObject = 0x2,
+  kDatagram = 0x3,
+};
+
+// The data contained in every Object message, although the message type
+// implies some of the values. |payload_length| has no value if the length
+// is unknown (because it runs to the end of the stream.)
 struct QUICHE_EXPORT MoqtObject {
-  uint64_t track_id;
-  uint64_t group_sequence;
-  uint64_t object_sequence;
+  uint64_t subscribe_id;
+  uint64_t track_alias;
+  uint64_t group_id;
+  uint64_t object_id;
   uint64_t object_send_order;
+  MoqtForwardingPreference forwarding_preference;
   std::optional<uint64_t> payload_length;
-  // Message also includes the object payload.
 };
 
 enum class QUICHE_EXPORT MoqtSubscribeLocationMode : uint64_t {
@@ -180,7 +208,9 @@
   }
 };
 
-struct QUICHE_EXPORT MoqtSubscribeRequest {
+struct QUICHE_EXPORT MoqtSubscribe {
+  uint64_t subscribe_id;
+  uint64_t track_alias;
   absl::string_view track_namespace;
   absl::string_view track_name;
   // If the mode is kNone, the these are std::nullopt.
@@ -188,13 +218,13 @@
   std::optional<MoqtSubscribeLocation> start_object;
   std::optional<MoqtSubscribeLocation> end_group;
   std::optional<MoqtSubscribeLocation> end_object;
+#ifdef MOQT_AUTH_INFO
   std::optional<absl::string_view> authorization_info;
+#endif
 };
 
 struct QUICHE_EXPORT MoqtSubscribeOk {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
-  uint64_t track_id;
+  uint64_t subscribe_id;
   // The message uses ms, but expires is in us.
   quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
 };
@@ -206,27 +236,24 @@
 };
 
 struct QUICHE_EXPORT MoqtSubscribeError {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   SubscribeErrorCode error_code;
   absl::string_view reason_phrase;
+  uint64_t track_alias;
 };
 
 struct QUICHE_EXPORT MoqtUnsubscribe {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
 };
 
 struct QUICHE_EXPORT MoqtSubscribeFin {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   uint64_t final_group;
   uint64_t final_object;
 };
 
 struct QUICHE_EXPORT MoqtSubscribeRst {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   uint64_t error_code;
   absl::string_view reason_phrase;
   uint64_t final_group;
@@ -258,6 +285,14 @@
 
 std::string MoqtMessageTypeToString(MoqtMessageType message_type);
 
+std::string MoqtForwardingPreferenceToString(
+    MoqtForwardingPreference preference);
+
+MoqtForwardingPreference GetForwardingPreference(MoqtMessageType type);
+
+MoqtMessageType GetMessageTypeForForwardingPreference(
+    MoqtForwardingPreference preference);
+
 }  // namespace moqt
 
 #endif  // QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 2b9e15f..b8754f2 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -7,7 +7,6 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
-#include <memory>
 #include <optional>
 #include <string>
 
@@ -41,9 +40,8 @@
   // Check for early fin
   if (fin) {
     no_more_data_ = true;
-    if (object_metadata_.has_value() &&
-        object_metadata_->payload_length.has_value() &&
-        *object_metadata_->payload_length > data.length()) {
+    if (ObjectPayloadInProgress() &&
+        payload_length_remaining_ > data.length()) {
       ParseError("End of stream before complete OBJECT PAYLOAD");
       return;
     }
@@ -57,7 +55,7 @@
   // There are three cases: the parser has already delivered an OBJECT header
   // and is now delivering payload; part of a message is in the buffer; or
   // no message is in progress.
-  if (object_metadata_.has_value()) {
+  if (ObjectPayloadInProgress()) {
     // This is additional payload for an OBJECT.
     QUICHE_DCHECK(buffered_message_.empty());
     if (!object_metadata_->payload_length.has_value()) {
@@ -79,7 +77,7 @@
     visitor_.OnObjectMessage(*object_metadata_,
                              data.substr(0, payload_length_remaining_), true);
     reader->Seek(payload_length_remaining_);
-    object_metadata_.reset();
+    payload_length_remaining_ = 0;  // Expect a new object.
   } else if (!buffered_message_.empty()) {
     absl::StrAppend(&buffered_message_, data);
     reader.emplace(buffered_message_);
@@ -113,29 +111,34 @@
   if (original_buffer_size > 0) {
     buffered_message_.erase(0, total_processed);
   }
-  if (fin && object_metadata_.has_value()) {
-    ParseError("Received FIN mid-payload");
-  }
 }
 
 size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
   uint64_t value;
   quic::QuicDataReader reader(data);
+  if (ObjectStreamInitialized() && !ObjectPayloadInProgress()) {
+    // This is a follow-on object in a stream.
+    return ProcessObject(reader,
+                         GetMessageTypeForForwardingPreference(
+                             object_metadata_->forwarding_preference),
+                         fin);
+  }
   if (!reader.ReadVarInt62(&value)) {
     return 0;
   }
   auto type = static_cast<MoqtMessageType>(value);
   switch (type) {
-    case MoqtMessageType::kObjectWithPayloadLength:
-      return ProcessObject(reader, true, fin);
-    case MoqtMessageType::kObjectWithoutPayloadLength:
-      return ProcessObject(reader, false, fin);
+    case MoqtMessageType::kObjectStream:
+    case MoqtMessageType::kObjectPreferDatagram:
+    case MoqtMessageType::kStreamHeaderTrack:
+    case MoqtMessageType::kStreamHeaderGroup:
+      return ProcessObject(reader, type, fin);
     case MoqtMessageType::kClientSetup:
       return ProcessClientSetup(reader);
     case MoqtMessageType::kServerSetup:
       return ProcessServerSetup(reader);
-    case MoqtMessageType::kSubscribeRequest:
-      return ProcessSubscribeRequest(reader);
+    case MoqtMessageType::kSubscribe:
+      return ProcessSubscribe(reader);
     case MoqtMessageType::kSubscribeOk:
       return ProcessSubscribeOk(reader);
     case MoqtMessageType::kSubscribeError:
@@ -162,39 +165,84 @@
   }
 }
 
-size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader, bool has_length,
-                                 bool fin) {
-  QUICHE_DCHECK(!object_metadata_.has_value());
-  object_metadata_ = MoqtObject();
-  uint64_t length;
-  if (!reader.ReadVarInt62(&object_metadata_->track_id) ||
-      !reader.ReadVarInt62(&object_metadata_->group_sequence) ||
-      !reader.ReadVarInt62(&object_metadata_->object_sequence) ||
-      !reader.ReadVarInt62(&object_metadata_->object_send_order) ||
-      (has_length && !reader.ReadVarInt62(&length))) {
-    object_metadata_.reset();
-    return 0;
+size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
+                                 MoqtMessageType type, bool fin) {
+  size_t processed_data;
+  QUICHE_DCHECK(!ObjectPayloadInProgress());
+  if (!ObjectStreamInitialized()) {
+    // nothing has been processed on the stream.
+    object_metadata_ = MoqtObject();
+    if (!reader.ReadVarInt62(&object_metadata_->subscribe_id) ||
+        !reader.ReadVarInt62(&object_metadata_->track_alias)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (type != MoqtMessageType::kStreamHeaderTrack &&
+        !reader.ReadVarInt62(&object_metadata_->group_id)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (type != MoqtMessageType::kStreamHeaderTrack &&
+        type != MoqtMessageType::kStreamHeaderGroup &&
+        !reader.ReadVarInt62(&object_metadata_->object_id)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (!reader.ReadVarInt62(&object_metadata_->object_send_order)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    object_metadata_->forwarding_preference = GetForwardingPreference(type);
   }
-  if (has_length) {
-    object_metadata_->payload_length = length;
+  // At this point, enough data has been processed to store in object_metadata_,
+  // even if there's nothing else in the buffer.
+  processed_data = reader.PreviouslyReadPayload().length();
+  QUICHE_DCHECK(payload_length_remaining_ == 0);
+  switch (type) {
+    case MoqtMessageType::kStreamHeaderTrack:
+      if (!reader.ReadVarInt62(&object_metadata_->group_id)) {
+        return processed_data;
+      }
+      [[fallthrough]];
+    case MoqtMessageType::kStreamHeaderGroup: {
+      uint64_t length;
+      if (!reader.ReadVarInt62(&object_metadata_->object_id) ||
+          !reader.ReadVarInt62(&length)) {
+        return processed_data;
+      }
+      object_metadata_->payload_length = length;
+      break;
+    }
+    default:
+      break;
   }
-  bool received_complete_message =
-      (fin && !has_length) ||
-      (has_length &&
-       *object_metadata_->payload_length <= reader.BytesRemaining());
-  size_t payload_to_draw = (!has_length || *object_metadata_->payload_length >=
-                                               reader.BytesRemaining())
-                               ? reader.BytesRemaining()
-                               : *object_metadata_->payload_length;
+  bool has_length = object_metadata_->payload_length.has_value();
+  bool received_complete_message = false;
+  size_t payload_to_draw = reader.BytesRemaining();
+  if (fin && has_length &&
+      *object_metadata_->payload_length > reader.BytesRemaining()) {
+    ParseError("Received FIN mid-payload");
+    return processed_data;
+  }
+  received_complete_message =
+      fin || (has_length &&
+              *object_metadata_->payload_length <= reader.BytesRemaining());
+  if (received_complete_message && has_length &&
+      *object_metadata_->payload_length < reader.BytesRemaining()) {
+    payload_to_draw = *object_metadata_->payload_length;
+  }
+  // The error case where there's a fin before the explicit length is complete
+  // is handled in ProcessData() in two separate places. Even though the
+  // message is "done" if fin regardless of has_length, it's bad to report to
+  // the application that the object is done if it hasn't reached the promised
+  // length.
   visitor_.OnObjectMessage(
       *object_metadata_,
       reader.PeekRemainingPayload().substr(0, payload_to_draw),
       received_complete_message);
-  if (received_complete_message) {
-    object_metadata_.reset();
-  }
   reader.Seek(payload_to_draw);
-  payload_length_remaining_ = length - payload_to_draw;
+  payload_length_remaining_ =
+      has_length ? *object_metadata_->payload_length - payload_to_draw : 0;
   return reader.PreviouslyReadPayload().length();
 }
 
@@ -303,41 +351,38 @@
   return reader.PreviouslyReadPayload().length();
 }
 
-size_t MoqtParser::ProcessSubscribeRequest(quic::QuicDataReader& reader) {
-  MoqtSubscribeRequest subscribe_request;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_request.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_request.track_name)) {
-    return 0;
-  }
-  if (!ReadLocation(reader, subscribe_request.start_group)) {
+size_t MoqtParser::ProcessSubscribe(quic::QuicDataReader& reader) {
+  MoqtSubscribe subscribe_request;
+  if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_request.track_alias) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_request.track_namespace) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_request.track_name) ||
+      !ReadLocation(reader, subscribe_request.start_group)) {
     return 0;
   }
   if (!subscribe_request.start_group.has_value()) {
-    ParseError("START_GROUP must not be None in SUBSCRIBE_REQUEST");
+    ParseError("START_GROUP must not be None in SUBSCRIBE");
     return 0;
   }
   if (!ReadLocation(reader, subscribe_request.start_object)) {
     return 0;
   }
   if (!subscribe_request.start_object.has_value()) {
-    ParseError("START_OBJECT must not be None in SUBSCRIBE_REQUEST");
+    ParseError("START_OBJECT must not be None in SUBSCRIBE");
     return 0;
   }
-  if (!ReadLocation(reader, subscribe_request.end_group)) {
-    return 0;
-  }
-  if (!ReadLocation(reader, subscribe_request.end_object)) {
+  if (!ReadLocation(reader, subscribe_request.end_group) ||
+      !ReadLocation(reader, subscribe_request.end_object)) {
     return 0;
   }
   if (subscribe_request.end_group.has_value() !=
       subscribe_request.end_object.has_value()) {
     ParseError(
-        "SUBSCRIBE_REQUEST end_group and end_object must be both None "
+        "SUBSCRIBE end_group and end_object must be both None "
         "or both non_None");
     return 0;
   }
+#ifdef MOQT_AUTH_INFO
   uint64_t num_params;
   if (!reader.ReadVarInt62(&num_params)) {
     return 0;
@@ -364,23 +409,16 @@
         break;
     }
   }
-  visitor_.OnSubscribeRequestMessage(subscribe_request);
+#endif
+  visitor_.OnSubscribeMessage(subscribe_request);
   return reader.PreviouslyReadPayload().length();
 }
 
 size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
   MoqtSubscribeOk subscribe_ok;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_ok.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_ok.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_ok.track_id)) {
-    return 0;
-  }
   uint64_t milliseconds;
-  if (!reader.ReadVarInt62(&milliseconds)) {
+  if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
+      !reader.ReadVarInt62(&milliseconds)) {
     return 0;
   }
   subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
@@ -390,30 +428,21 @@
 
 size_t MoqtParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
   MoqtSubscribeError subscribe_error;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.track_name)) {
-    return 0;
-  }
   uint64_t error_code;
-  if (!reader.ReadVarInt62(&error_code)) {
+  if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
+      !reader.ReadVarInt62(&error_code) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase) ||
+      !reader.ReadVarInt62(&subscribe_error.track_alias)) {
     return 0;
   }
   subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase)) {
-    return 0;
-  }
   visitor_.OnSubscribeErrorMessage(subscribe_error);
   return reader.PreviouslyReadPayload().length();
 }
 
 size_t MoqtParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
   MoqtUnsubscribe unsubscribe;
-  if (!reader.ReadStringPieceVarInt62(&unsubscribe.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&unsubscribe.track_name)) {
+  if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
     return 0;
   }
   visitor_.OnUnsubscribeMessage(unsubscribe);
@@ -422,16 +451,9 @@
 
 size_t MoqtParser::ProcessSubscribeFin(quic::QuicDataReader& reader) {
   MoqtSubscribeFin subscribe_fin;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_fin.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_fin.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_fin.final_group)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_fin.final_object)) {
+  if (!reader.ReadVarInt62(&subscribe_fin.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_fin.final_group) ||
+      !reader.ReadVarInt62(&subscribe_fin.final_object)) {
     return 0;
   }
   visitor_.OnSubscribeFinMessage(subscribe_fin);
@@ -440,22 +462,11 @@
 
 size_t MoqtParser::ProcessSubscribeRst(quic::QuicDataReader& reader) {
   MoqtSubscribeRst subscribe_rst;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.error_code)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.reason_phrase)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.final_group)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.final_object)) {
+  if (!reader.ReadVarInt62(&subscribe_rst.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_rst.error_code) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_rst.reason_phrase) ||
+      !reader.ReadVarInt62(&subscribe_rst.final_group) ||
+      !reader.ReadVarInt62(&subscribe_rst.final_object)) {
     return 0;
   }
   visitor_.OnSubscribeRstMessage(subscribe_rst);
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 2674885..3bb7f47 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -34,8 +34,7 @@
   // parser retains ownership of the memory.
   virtual void OnClientSetupMessage(const MoqtClientSetup& message) = 0;
   virtual void OnServerSetupMessage(const MoqtServerSetup& message) = 0;
-  virtual void OnSubscribeRequestMessage(
-      const MoqtSubscribeRequest& message) = 0;
+  virtual void OnSubscribeMessage(const MoqtSubscribe& message) = 0;
   virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0;
   virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0;
   virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0;
@@ -63,6 +62,8 @@
   // All bytes can be freed. Calls OnParsingError() when there is a parsing
   // error.
   // Any calls after sending |fin| = true will be ignored.
+  // TODO(martinduke): Figure out what has to happen if the message arrives via
+  // datagram rather than a stream.
   void ProcessData(absl::string_view data, bool fin);
 
  private:
@@ -75,10 +76,11 @@
   // structs, and call the relevant visitor function for further action. Returns
   // the number of bytes consumed if the message is complete; returns 0
   // otherwise.
-  size_t ProcessObject(quic::QuicDataReader& reader, bool has_length, bool fin);
+  size_t ProcessObject(quic::QuicDataReader& reader, MoqtMessageType type,
+                       bool fin);
   size_t ProcessClientSetup(quic::QuicDataReader& reader);
   size_t ProcessServerSetup(quic::QuicDataReader& reader);
-  size_t ProcessSubscribeRequest(quic::QuicDataReader& reader);
+  size_t ProcessSubscribe(quic::QuicDataReader& reader);
   size_t ProcessSubscribeOk(quic::QuicDataReader& reader);
   size_t ProcessSubscribeError(quic::QuicDataReader& reader);
   size_t ProcessUnsubscribe(quic::QuicDataReader& reader);
@@ -108,6 +110,21 @@
   // string_view is not exactly the right length.
   bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi);
 
+  // Simplify understanding of state.
+  // Returns true if the stream has delivered all object metadata common to all
+  // objects on that stream.
+  bool ObjectStreamInitialized() const { return object_metadata_.has_value(); }
+  // Returns true if the stream has delivered all metadata but not all payload
+  // for the most recent object.
+  bool ObjectPayloadInProgress() const {
+    return (object_metadata_.has_value() &&
+            (object_metadata_->forwarding_preference ==
+                 MoqtForwardingPreference::kObject ||
+             object_metadata_->forwarding_preference ==
+                 MoqtForwardingPreference::kDatagram ||
+             payload_length_remaining_ > 0));
+  }
+
   MoqtParserVisitor& visitor_;
   bool uses_web_transport_;
   bool no_more_data_ = false;  // Fatal error or fin. No more parsing.
@@ -116,8 +133,17 @@
   std::string buffered_message_;
 
   // Metadata for an object which is delivered in parts.
+  // If object_metadata_ is nullopt, nothing has been processed on the stream.
+  // If object_metadata_ exists but payload_length is nullopt or
+  // payload_length_remaining_ is nonzero, the object payload is in mid-
+  // delivery.
+  // If object_metadata_ exists and payload_length_remaining_ is zero, an object
+  // has been completely delivered and the next object header on the stream has
+  // not been delivered.
+  // Use ObjectStreamInitialized() and ObjectPayloadInProgress() to keep the
+  // state straight.
   std::optional<MoqtObject> object_metadata_ = std::nullopt;
-  size_t payload_length_remaining_;
+  size_t payload_length_remaining_ = 0;
 
   bool processing_ = false;  // True if currently in ProcessData(), to prevent
                              // re-entrancy.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index d5d8f45..c0f1e9b 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -22,17 +22,22 @@
 
 namespace {
 
-bool IsObjectMessage(MoqtMessageType type) {
-  return (type == MoqtMessageType::kObjectWithPayloadLength ||
-          type == MoqtMessageType::kObjectWithoutPayloadLength);
+inline bool IsObjectMessage(MoqtMessageType type) {
+  return (type == MoqtMessageType::kObjectStream ||
+          type == MoqtMessageType::kObjectPreferDatagram ||
+          type == MoqtMessageType::kStreamHeaderTrack ||
+          type == MoqtMessageType::kStreamHeaderGroup);
+}
+
+inline bool IsObjectWithoutPayloadLength(MoqtMessageType type) {
+  return (type == MoqtMessageType::kObjectStream ||
+          type == MoqtMessageType::kObjectPreferDatagram);
 }
 
 std::vector<MoqtMessageType> message_types = {
-    MoqtMessageType::kObjectWithPayloadLength,
-    MoqtMessageType::kObjectWithoutPayloadLength,
-    MoqtMessageType::kClientSetup,
-    MoqtMessageType::kServerSetup,
-    MoqtMessageType::kSubscribeRequest,
+    MoqtMessageType::kObjectStream,
+    MoqtMessageType::kObjectPreferDatagram,
+    MoqtMessageType::kSubscribe,
     MoqtMessageType::kSubscribeOk,
     MoqtMessageType::kSubscribeError,
     MoqtMessageType::kUnsubscribe,
@@ -42,6 +47,10 @@
     MoqtMessageType::kAnnounceOk,
     MoqtMessageType::kAnnounceError,
     MoqtMessageType::kUnannounce,
+    MoqtMessageType::kClientSetup,
+    MoqtMessageType::kServerSetup,
+    MoqtMessageType::kStreamHeaderTrack,
+    MoqtMessageType::kStreamHeaderGroup,
     MoqtMessageType::kGoAway,
 };
 
@@ -110,72 +119,54 @@
     MoqtServerSetup server_setup = message;
     last_message_ = TestMessageBase::MessageStructuredData(server_setup);
   }
-  void OnSubscribeRequestMessage(const MoqtSubscribeRequest& message) override {
+  void OnSubscribeMessage(const MoqtSubscribe& message) override {
     end_of_message_ = true;
     messages_received_++;
-    MoqtSubscribeRequest subscribe_request = message;
+    MoqtSubscribe subscribe_request = message;
     string0_ = std::string(subscribe_request.track_namespace);
     subscribe_request.track_namespace = absl::string_view(string0_);
     string1_ = std::string(subscribe_request.track_name);
     subscribe_request.track_name = absl::string_view(string1_);
+#ifdef MOQT_AUTH_INFO
     if (subscribe_request.authorization_info.has_value()) {
       string2_ = std::string(*subscribe_request.authorization_info);
       subscribe_request.authorization_info = absl::string_view(string2_);
     }
+#endif
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_request);
   }
   void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeOk subscribe_ok = message;
-    string0_ = std::string(subscribe_ok.track_namespace);
-    subscribe_ok.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_ok.track_name);
-    subscribe_ok.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_ok);
   }
   void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeError subscribe_error = message;
-    string0_ = std::string(subscribe_error.track_namespace);
-    subscribe_error.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_error.track_name);
-    subscribe_error.track_name = absl::string_view(string1_);
-    string1_ = std::string(subscribe_error.reason_phrase);
-    subscribe_error.reason_phrase = absl::string_view(string1_);
+    string0_ = std::string(subscribe_error.reason_phrase);
+    subscribe_error.reason_phrase = absl::string_view(string0_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_error);
   }
   void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtUnsubscribe unsubscribe = message;
-    string0_ = std::string(unsubscribe.track_namespace);
-    unsubscribe.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(unsubscribe.track_name);
-    unsubscribe.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(unsubscribe);
   }
   void OnSubscribeFinMessage(const MoqtSubscribeFin& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeFin subscribe_fin = message;
-    string0_ = std::string(subscribe_fin.track_namespace);
-    subscribe_fin.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_fin.track_name);
-    subscribe_fin.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_fin);
   }
   void OnSubscribeRstMessage(const MoqtSubscribeRst& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeRst subscribe_rst = message;
-    string0_ = std::string(subscribe_rst.track_namespace);
-    subscribe_rst.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_rst.track_name);
-    subscribe_rst.track_name = absl::string_view(string1_);
-    string2_ = std::string(subscribe_rst.reason_phrase);
-    subscribe_rst.reason_phrase = absl::string_view(string2_);
+    string0_ = std::string(subscribe_rst.reason_phrase);
+    subscribe_rst.reason_phrase = absl::string_view(string0_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_rst);
   }
   void OnAnnounceMessage(const MoqtAnnounce& message) override {
@@ -252,40 +243,7 @@
         parser_(GetParam().uses_web_transport, visitor_) {}
 
   std::unique_ptr<TestMessageBase> MakeMessage(MoqtMessageType message_type) {
-    switch (message_type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        return std::make_unique<ObjectMessageWithLength>();
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        return std::make_unique<ObjectMessageWithoutLength>();
-      case MoqtMessageType::kClientSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kServerSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kSubscribeRequest:
-        return std::make_unique<SubscribeRequestMessage>();
-      case MoqtMessageType::kSubscribeOk:
-        return std::make_unique<SubscribeOkMessage>();
-      case MoqtMessageType::kSubscribeError:
-        return std::make_unique<SubscribeErrorMessage>();
-      case MoqtMessageType::kUnsubscribe:
-        return std::make_unique<UnsubscribeMessage>();
-      case MoqtMessageType::kSubscribeFin:
-        return std::make_unique<SubscribeFinMessage>();
-      case MoqtMessageType::kSubscribeRst:
-        return std::make_unique<SubscribeRstMessage>();
-      case MoqtMessageType::kAnnounce:
-        return std::make_unique<AnnounceMessage>();
-      case moqt::MoqtMessageType::kAnnounceOk:
-        return std::make_unique<AnnounceOkMessage>();
-      case moqt::MoqtMessageType::kAnnounceError:
-        return std::make_unique<AnnounceErrorMessage>();
-      case moqt::MoqtMessageType::kUnannounce:
-        return std::make_unique<UnannounceMessage>();
-      case moqt::MoqtMessageType::kGoAway:
-        return std::make_unique<GoAwayMessage>();
-      default:
-        return nullptr;
-    }
+    return CreateTestMessage(message_type, webtrans_);
   }
 
   MoqtParserTestVisitor visitor_;
@@ -331,6 +289,10 @@
   // so splitting the message in half will prevent the first half from being
   // processed.
   size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
   parser_.ProcessData(message->PacketSample().substr(0, first_data_size),
                       false);
   EXPECT_EQ(visitor_.messages_received_, 0);
@@ -359,7 +321,7 @@
   }
   EXPECT_EQ(visitor_.messages_received_,
             (IsObjectMessage(message_type_) ? (kObjectPayloadSize + 1) : 1));
-  if (message_type_ == MoqtMessageType::kObjectWithoutPayloadLength) {
+  if (IsObjectWithoutPayloadLength(message_type_)) {
     EXPECT_FALSE(visitor_.end_of_message_);
     parser_.ProcessData(absl::string_view(), true);  // Needs the FIN
     EXPECT_EQ(visitor_.messages_received_, kObjectPayloadSize + 2);
@@ -382,7 +344,7 @@
   }
   EXPECT_EQ(visitor_.messages_received_,
             (IsObjectMessage(message_type_) ? (kObjectPayloadSize + 1) : 1));
-  if (message_type_ == MoqtMessageType::kObjectWithoutPayloadLength) {
+  if (IsObjectWithoutPayloadLength(message_type_)) {
     EXPECT_FALSE(visitor_.end_of_message_);
     parser_.ProcessData(absl::string_view(), true);  // Needs the FIN
     EXPECT_EQ(visitor_.messages_received_, kObjectPayloadSize + 2);
@@ -394,9 +356,12 @@
 
 TEST_P(MoqtParserTest, EarlyFin) {
   std::unique_ptr<TestMessageBase> message = MakeMessage(message_type_);
-  parser_.ProcessData(
-      message->PacketSample().substr(0, message->total_message_size() / 2),
-      true);
+  size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
+  parser_.ProcessData(message->PacketSample().substr(0, first_data_size), true);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_, "FIN after incomplete message");
@@ -404,9 +369,13 @@
 
 TEST_P(MoqtParserTest, SeparateEarlyFin) {
   std::unique_ptr<TestMessageBase> message = MakeMessage(message_type_);
-  parser_.ProcessData(
-      message->PacketSample().substr(0, message->total_message_size() / 2),
-      false);
+  size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
+  parser_.ProcessData(message->PacketSample().substr(0, first_data_size),
+                      false);
   parser_.ProcessData(absl::string_view(), true);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
@@ -426,11 +395,32 @@
   static constexpr bool kRawQuic = false;
 };
 
-TEST_F(MoqtMessageSpecificTest, ObjectNoLengthSeparateFin) {
+TEST_F(MoqtMessageSpecificTest, ObjectStreamSeparateFin) {
   // OBJECT can return on an unknown-length message even without receiving a
   // FIN.
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
+  parser.ProcessData(message->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.end_of_message_);
+
+  parser.ProcessData(absl::string_view(), true);  // send the FIN
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "");
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
+TEST_F(MoqtMessageSpecificTest, ObjectPreferDatagramSeparateFin) {
+  // OBJECT can return on an unknown-length message even without receiving a
+  // FIN.
+  MoqtParser parser(kRawQuic, visitor_);
+  auto message = std::make_unique<ObjectPreferDatagramMessage>();
   parser.ProcessData(message->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 1);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -451,7 +441,7 @@
 // message.
 TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
   parser.ProcessData(message->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 1);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -480,7 +470,7 @@
 // Send the part of header, rest of header + payload, plus payload.
 TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
 
   // first part
   parser.ProcessData(message->PacketSample().substr(0, 4), false);
@@ -495,7 +485,7 @@
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
   EXPECT_FALSE(visitor_.end_of_message_);
   EXPECT_TRUE(visitor_.object_payload_.has_value());
-  EXPECT_EQ(visitor_.object_payload_->length(), 95);
+  EXPECT_EQ(visitor_.object_payload_->length(), 94);
 
   // third part includes FIN
   parser.ProcessData("bar", true);
@@ -507,6 +497,50 @@
   EXPECT_FALSE(visitor_.parsing_error_.has_value());
 }
 
+TEST_F(MoqtMessageSpecificTest, StreamHeaderGroupFollowOn) {
+  MoqtParser parser(kRawQuic, visitor_);
+  // first part
+  auto message1 = std::make_unique<StreamHeaderGroupMessage>();
+  parser.ProcessData(message1->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+  // second part
+  auto message2 = std::make_unique<StreamMiddlerGroupMessage>();
+  parser.ProcessData(message2->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "bar");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
+TEST_F(MoqtMessageSpecificTest, StreamHeaderTrackFollowOn) {
+  MoqtParser parser(kRawQuic, visitor_);
+  // first part
+  auto message1 = std::make_unique<StreamHeaderTrackMessage>();
+  parser.ProcessData(message1->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+  // second part
+  auto message2 = std::make_unique<StreamMiddlerTrackMessage>();
+  parser.ProcessData(message2->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "bar");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
 TEST_F(MoqtMessageSpecificTest, SetupRoleAppearsTwice) {
   MoqtParser parser(kRawQuic, visitor_);
   char setup[] = {
@@ -619,11 +653,12 @@
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
 
-TEST_F(MoqtMessageSpecificTest, SubscribeRequestAuthorizationInfoTwice) {
+#ifdef MOQT_AUTH_INFO
+TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationInfoTwice) {
   MoqtParser parser(kWebTrans, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
+  char subscribe[] = {
+      0x03, 0x01, 0x02, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
+      0x04, 0x61, 0x62, 0x63, 0x64,              // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
@@ -632,14 +667,14 @@
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
             "AUTHORIZATION_INFO parameter appears twice in SUBSCRIBE_REQUEST");
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
+#endif
 
 TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationInfoTwice) {
   MoqtParser parser(kWebTrans, visitor_);
@@ -659,11 +694,11 @@
 
 TEST_F(MoqtMessageSpecificTest, FinMidPayload) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithLength>();
+  auto message = std::make_unique<StreamHeaderGroupMessage>();
   parser.ProcessData(
       message->PacketSample().substr(0, message->total_message_size() - 1),
       true);
-  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_, "Received FIN mid-payload");
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
@@ -671,7 +706,7 @@
 
 TEST_F(MoqtMessageSpecificTest, PartialPayloadThenFin) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithLength>();
+  auto message = std::make_unique<StreamHeaderTrackMessage>();
   parser.ProcessData(
       message->PacketSample().substr(0, message->total_message_size() - 1),
       false);
@@ -724,62 +759,68 @@
 
 TEST_F(MoqtMessageSpecificTest, StartGroupIsNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x00,                          // start_group = none
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
       0x00,                          // end_object = none
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "START_GROUP must not be None in SUBSCRIBE_REQUEST");
+            "START_GROUP must not be None in SUBSCRIBE");
 }
 
 TEST_F(MoqtMessageSpecificTest, StartObjectIsNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x00,                          // start_object = none
       0x00,                          // end_group = none
       0x00,                          // end_object = none
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "START_OBJECT must not be None in SUBSCRIBE_REQUEST");
+            "START_OBJECT must not be None in SUBSCRIBE");
 }
 
 TEST_F(MoqtMessageSpecificTest, EndGroupIsNoneEndObjectIsNoNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
       0x01, 0x01,                    // end_object = 1 (absolute)
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "SUBSCRIBE_REQUEST end_group and end_object must be both None "
+            "SUBSCRIBE end_group and end_object must be both None "
             "or both non_None");
 }
 
@@ -793,59 +834,11 @@
   for (MoqtMessageType type : message_types) {
     // Each iteration, process from the halfway point of one message to the
     // halfway point of the next.
-    if (type == MoqtMessageType::kObjectWithoutPayloadLength) {
-      continue;  // Cannot be followed with another message.
+    if (IsObjectMessage(type)) {
+      continue;  // Objects cannot share a stream with other meessages.
     }
-    std::unique_ptr<TestMessageBase> message;
-    switch (type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        message = std::make_unique<ObjectMessageWithLength>();
-        break;
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        continue;  // Cannot be followed with another message;
-      case MoqtMessageType::kClientSetup:
-        message = std::make_unique<ClientSetupMessage>(kRawQuic);
-        break;
-      case MoqtMessageType::kServerSetup:
-        message = std::make_unique<ClientSetupMessage>(kRawQuic);
-        break;
-      case MoqtMessageType::kSubscribeRequest:
-        message = std::make_unique<SubscribeRequestMessage>();
-        break;
-      case MoqtMessageType::kSubscribeOk:
-        message = std::make_unique<SubscribeOkMessage>();
-        break;
-      case MoqtMessageType::kSubscribeError:
-        message = std::make_unique<SubscribeErrorMessage>();
-        break;
-      case MoqtMessageType::kUnsubscribe:
-        message = std::make_unique<UnsubscribeMessage>();
-        break;
-      case MoqtMessageType::kSubscribeFin:
-        message = std::make_unique<SubscribeFinMessage>();
-        break;
-      case MoqtMessageType::kSubscribeRst:
-        message = std::make_unique<SubscribeRstMessage>();
-        break;
-      case MoqtMessageType::kAnnounce:
-        message = std::make_unique<AnnounceMessage>();
-        break;
-      case moqt::MoqtMessageType::kAnnounceOk:
-        message = std::make_unique<AnnounceOkMessage>();
-        break;
-      case moqt::MoqtMessageType::kAnnounceError:
-        message = std::make_unique<AnnounceErrorMessage>();
-        break;
-      case moqt::MoqtMessageType::kUnannounce:
-        message = std::make_unique<UnannounceMessage>();
-        break;
-      case moqt::MoqtMessageType::kGoAway:
-        message = std::make_unique<GoAwayMessage>();
-        break;
-      default:
-        message = nullptr;
-        break;
-    }
+    std::unique_ptr<TestMessageBase> message =
+        CreateTestMessage(type, kRawQuic);
     memcpy(buffer + write, message->PacketSample().data(),
            message->total_message_size());
     size_t new_read = write + message->total_message_size() / 2;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 7adb7ee..f78d3f5 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -12,6 +12,7 @@
 #include <vector>
 
 #include "absl/algorithm/container.h"
+#include "absl/status/status.h"
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
@@ -30,8 +31,6 @@
 
 using ::quic::Perspective;
 
-constexpr int kMaxBufferedObjects = 1000;
-
 void MoqtSession::OnSessionReady() {
   QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session ready";
   if (parameters_.perspective == Perspective::IS_SERVER) {
@@ -108,8 +107,7 @@
 
 void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
                                 LocalTrack::Visitor* visitor) {
-  local_tracks_.try_emplace(full_track_name, full_track_name,
-                            next_track_alias_++, visitor);
+  local_tracks_.try_emplace(full_track_name, full_track_name, visitor);
 }
 
 // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
@@ -144,7 +142,7 @@
                                     uint64_t start_group, uint64_t start_object,
                                     RemoteTrack::Visitor* visitor,
                                     absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(true, start_group);
@@ -171,7 +169,7 @@
     QUIC_DLOG(ERROR) << "Subscription end is before beginning";
     return false;
   }
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(true, start_group);
@@ -189,7 +187,7 @@
                                     int64_t start_object,
                                     RemoteTrack::Visitor* visitor,
                                     absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(false, start_group);
@@ -206,7 +204,7 @@
                                         absl::string_view name,
                                         RemoteTrack::Visitor* visitor,
                                         absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   // First object of current group.
@@ -220,22 +218,31 @@
   return Subscribe(message, visitor);
 }
 
-bool MoqtSession::Subscribe(const MoqtSubscribeRequest& message,
+bool MoqtSession::Subscribe(MoqtSubscribe& message,
                             RemoteTrack::Visitor* visitor) {
   // TODO(martinduke): support authorization info
-  bool success =
-      session_->GetStreamById(*control_stream_)
-          ->Write(framer_.SerializeSubscribeRequest(message).AsStringView());
-  if (!success) {
-    Error(MoqtError::kGenericError,
-          "Failed to write SUBSCRIBE_REQUEST message");
-    return false;
-  }
-  QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for "
-                  << message.track_namespace << ":" << message.track_name;
+  message.subscribe_id = next_subscribe_id_++;
   FullTrackName ftn(std::string(message.track_namespace),
                     std::string(message.track_name));
-  remote_tracks_.try_emplace(ftn, ftn, visitor);
+  auto it = remote_track_aliases_.find(ftn);
+  if (it != remote_track_aliases_.end()) {
+    message.track_alias = it->second;
+    if (message.track_alias >= next_remote_track_alias_) {
+      next_remote_track_alias_ = message.track_alias + 1;
+    }
+  } else {
+    message.track_alias = next_remote_track_alias_++;
+  }
+  bool success =
+      session_->GetStreamById(*control_stream_)
+          ->Write(framer_.SerializeSubscribe(message).AsStringView());
+  if (!success) {
+    Error(MoqtError::kGenericError, "Failed to write SUBSCRIBE message");
+    return false;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
+                  << message.track_namespace << ":" << message.track_name;
+  active_subscribes_.try_emplace(message.subscribe_id, message, visitor);
   return true;
 }
 
@@ -252,55 +259,87 @@
   return new_stream->GetStreamId();
 }
 
-// increment object_sequence or group_sequence depending on |start_new_group|
-void MoqtSession::PublishObjectToStream(webtransport::StreamId stream_id,
-                                        FullTrackName full_track_name,
-                                        bool start_new_group,
-                                        absl::string_view payload) {
-  // TODO: check that the peer is subscribed to the next sequence.
-  webtransport::Stream* stream = session_->GetStreamById(stream_id);
-  if (stream == nullptr) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream";
-    return;
+bool MoqtSession::PublishObject(FullTrackName& full_track_name,
+                                uint64_t group_id, uint64_t object_id,
+                                uint64_t object_send_order,
+                                MoqtForwardingPreference forwarding_preference,
+                                absl::string_view payload,
+                                std::optional<uint64_t> payload_length,
+                                bool end_of_stream) {
+  if (payload_length.has_value() && *payload_length < payload.length()) {
+    QUICHE_DLOG(ERROR) << ENDPOINT << "Payload too short";
+    return false;
   }
   auto track_it = local_tracks_.find(full_track_name);
   if (track_it == local_tracks_.end()) {
     QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return;
+    return false;
+  }
+  LocalTrack& track = track_it->second;
+  track.SentSequence(FullSequence(group_id, object_id));
+  std::vector<SubscribeWindow*> subscriptions =
+      track.ShouldSend({group_id, object_id});
+  if (subscriptions.empty()) {
+    return true;
   }
   MoqtObject object;
-  LocalTrack& track = track_it->second;
-  object.track_id = track.track_alias();
-  FullSequence& next_sequence = track.next_sequence_mutable();
-  object.group_sequence = next_sequence.group;
-  if (start_new_group) {
-    ++object.group_sequence;
-    object.object_sequence = 0;
-  } else {
-    object.object_sequence = next_sequence.object;
-  }
-  next_sequence.group = object.group_sequence;
-  next_sequence.object = object.object_sequence + 1;
-  if (!track.ShouldSend(object.group_sequence, object.object_sequence)) {
-    QUICHE_LOG(INFO) << ENDPOINT << "Not sending object "
+  QUICHE_DCHECK(track.track_alias().has_value());
+  object.track_alias = *track.track_alias();
+  object.group_id = group_id;
+  object.object_id = object_id;
+  object.object_send_order = object_send_order;
+  object.forwarding_preference = forwarding_preference;
+  object.payload_length = payload_length;
+  int failures = 0;
+  quiche::StreamWriteOptions write_options;
+  write_options.set_send_fin(end_of_stream);
+  for (auto subscription : subscriptions) {
+    // TODO: kPreferDatagram should bypass stream stuff. For now, send it on the
+    // stream.
+    bool new_stream = false;
+    std::optional<webtransport::StreamId> stream_id =
+        subscription->GetStreamForSequence({group_id, object_id},
+                                           forwarding_preference);
+    if (!stream_id.has_value()) {
+      new_stream = true;
+      stream_id = OpenUnidirectionalStream();
+      if (!stream_id.has_value()) {
+        QUICHE_DLOG(ERROR) << ENDPOINT
+                           << "Sending OBJECT to nonexistent stream";
+        ++failures;
+        continue;
+      }
+      if (!end_of_stream) {
+        subscription->AddStream(forwarding_preference, *stream_id, group_id,
+                                object_id);
+      }
+    }
+    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
+    if (stream == nullptr) {
+      QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream "
+                         << *stream_id;
+      ++failures;
+      continue;
+    }
+    object.subscribe_id = subscription->subscribe_id();
+    if (quiche::WriteIntoStream(
+            *stream,
+            framer_.SerializeObject(object, payload, new_stream).AsStringView(),
+            write_options) != absl::OkStatus()) {
+      QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
+      ++failures;
+      continue;
+    }
+    QUICHE_LOG(INFO) << ENDPOINT << "Sending object "
                      << full_track_name.track_namespace << ":"
                      << full_track_name.track_name << " with sequence "
-                     << object.group_sequence << ":" << object.object_sequence
-                     << " because peer is not subscribed";
-    return;
+                     << object.group_id << ":" << object.object_id
+                     << " on stream " << *stream_id;
+    if (end_of_stream && !new_stream) {
+      subscription->RemoveStream(forwarding_preference, group_id, object_id);
+    }
   }
-  object.object_send_order = 0;
-  object.payload_length = payload.size();
-  bool success =
-      stream->Write(framer_.SerializeObject(object, payload).AsStringView());
-  if (!success) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
-    return;
-  }
-  QUICHE_LOG(INFO) << ENDPOINT << "Sending object "
-                   << full_track_name.track_namespace << ":"
-                   << full_track_name.track_name << " with sequence "
-                   << object.group_sequence << ":" << object.object_sequence;
+  return (failures == 0);
 }
 
 void MoqtSession::Stream::OnCanRead() {
@@ -338,15 +377,17 @@
                     "Received OBJECT message on control stream");
     return;
   }
-  QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message on stream "
-                    << stream_->GetStreamId() << " for track alias "
-                    << message.track_id << " with sequence "
-                    << message.group_sequence << ":" << message.object_sequence
-                    << " length " << payload.size() << " explicit length "
-                    << (message.payload_length.has_value()
-                            ? (int)*message.payload_length
-                            : -1)
-                    << (end_of_message ? "F" : "");
+  QUICHE_DLOG(INFO)
+      << ENDPOINT << "Received OBJECT message on stream "
+      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
+      << " for track alias " << message.track_alias << " with sequence "
+      << message.group_id << ":" << message.object_id << " send_order "
+      << message.object_send_order << " forwarding_preference "
+      << MoqtForwardingPreferenceToString(message.forwarding_preference)
+      << " length " << payload.size() << " explicit length "
+      << (message.payload_length.has_value() ? (int)*message.payload_length
+                                             : -1)
+      << (end_of_message ? "F" : "");
   if (!session_->parameters_.deliver_partial_objects) {
     if (!end_of_message) {  // Buffer partial object.
       absl::StrAppend(&partial_object_, payload);
@@ -357,33 +398,17 @@
       payload = absl::string_view(partial_object_);
     }
   }
-  auto it = session_->tracks_by_alias_.find(message.track_id);
-  if (it == session_->tracks_by_alias_.end()) {
-    // No SUBSCRIBE_OK received with this alias, buffer it.
-    auto it2 = session_->object_queue_.find(message.track_id);
-    std::vector<BufferedObject>* queue;
-    if (it2 == session_->object_queue_.end()) {
-      queue = &session_->object_queue_[message.track_id];
-    } else {
-      queue = &it2->second;
-    }
-    if (session_->num_buffered_objects_ >= kMaxBufferedObjects) {
-      session_->num_buffered_objects_++;
-      session_->Error(MoqtError::kGenericError, "Too many buffered objects");
-      return;
-    }
-    queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload,
-                                    end_of_message));
-    QUIC_DLOG(INFO) << ENDPOINT << "Buffering OBJECT for track alias "
-                    << message.track_id;
+  auto it = session_->remote_tracks_.find(message.track_alias);
+  if (it == session_->remote_tracks_.end()) {
+    // No SUBSCRIBE_OK received with this alias, return.
     return;
   }
-  RemoteTrack* subscription = it->second;
-  if (subscription->visitor() != nullptr) {
-    subscription->visitor()->OnObjectFragment(
-        subscription->full_track_name(), stream_->GetStreamId(),
-        message.group_sequence, message.object_sequence,
-        message.object_send_order, payload, end_of_message);
+  RemoteTrack& subscription = it->second;
+  if (subscription.visitor() != nullptr) {
+    subscription.visitor()->OnObjectFragment(
+        subscription.full_track_name(), message.group_id, message.object_id,
+        message.object_send_order, message.forwarding_preference, payload,
+        end_of_message);
   }
   partial_object_.clear();
 }
@@ -456,14 +481,15 @@
   std::move(session_->session_established_callback_)();
 }
 
-void MoqtSession::Stream::SendSubscribeError(
-    const MoqtSubscribeRequest& message, SubscribeErrorCode error_code,
-    absl::string_view reason_phrase) {
+void MoqtSession::Stream::SendSubscribeError(const MoqtSubscribe& message,
+                                             SubscribeErrorCode error_code,
+                                             absl::string_view reason_phrase,
+                                             uint64_t track_alias) {
   MoqtSubscribeError subscribe_error;
-  subscribe_error.track_namespace = message.track_namespace;
-  subscribe_error.track_name = message.track_name;
+  subscribe_error.subscribe_id = message.subscribe_id;
   subscribe_error.error_code = error_code;
   subscribe_error.reason_phrase = reason_phrase;
+  subscribe_error.track_alias = track_alias;
   bool success =
       stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error)
                          .AsStringView());
@@ -473,13 +499,12 @@
   }
 }
 
-void MoqtSession::Stream::OnSubscribeRequestMessage(
-    const MoqtSubscribeRequest& message) {
+void MoqtSession::Stream::OnSubscribeMessage(const MoqtSubscribe& message) {
   std::string reason_phrase = "";
   if (!CheckIfIsControlStream()) {
     return;
   }
-  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_REQUEST for "
+  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.track_namespace << ":" << message.track_name;
   auto it = session_->local_tracks_.find(FullTrackName(
       std::string(message.track_namespace), std::string(message.track_name)));
@@ -488,32 +513,50 @@
                     << message.track_namespace << ":" << message.track_name
                     << " does not exist";
     SendSubscribeError(message, SubscribeErrorCode::kGenericError,
-                       "Track does not exist");
+                       "Track does not exist", message.track_alias);
     return;
   }
   LocalTrack& track = it->second;
+  if ((track.track_alias().has_value() &&
+       message.track_alias != *track.track_alias()) ||
+      session_->used_track_aliases_.contains(message.track_alias)) {
+    // Propose a different track_alias.
+    SendSubscribeError(message, SubscribeErrorCode::kRetryTrackAlias,
+                       "Track alias already exists",
+                       session_->next_local_track_alias_++);
+    return;
+  } else {  // Use client-provided alias.
+    track.set_track_alias(message.track_alias);
+    if (message.track_alias >= session_->next_local_track_alias_) {
+      session_->next_local_track_alias_ = message.track_alias + 1;
+    }
+    session_->used_track_aliases_.insert(message.track_alias);
+  }
   std::optional<FullSequence> start = session_->LocationToAbsoluteNumber(
       track, message.start_group, message.start_object);
   QUICHE_DCHECK(start.has_value());  // Parser enforces this.
   std::optional<FullSequence> end = session_->LocationToAbsoluteNumber(
       track, message.end_group, message.end_object);
   if (start < track.next_sequence() && track.visitor() != nullptr) {
-    SubscribeWindow window = end.has_value()
-                                 ? SubscribeWindow(start->group, start->object,
-                                                   end->group, end->object)
-                                 : SubscribeWindow(start->group, start->object);
+    // TODO: Rework this. It's not good that the session notifies the
+    // application -- presumably triggering the send of a bunch of objects --
+    // and only then sends the Subscribe OK.
+    SubscribeWindow window =
+        end.has_value()
+            ? SubscribeWindow(message.subscribe_id, start->group, start->object,
+                              end->group, end->object)
+            : SubscribeWindow(message.subscribe_id, start->group,
+                              start->object);
     std::optional<absl::string_view> past_objects_available =
-        track.visitor()->OnSubscribeRequestForPast(window);
-    if (!past_objects_available.has_value()) {
+        track.visitor()->OnSubscribeForPast(window);
+    if (past_objects_available.has_value()) {
       SendSubscribeError(message, SubscribeErrorCode::kGenericError,
-                         "Object does not exist");
+                         "Object does not exist", message.track_alias);
       return;
     }
   }
   MoqtSubscribeOk subscribe_ok;
-  subscribe_ok.track_namespace = message.track_namespace;
-  subscribe_ok.track_name = message.track_name;
-  subscribe_ok.track_id = track.track_alias();
+  subscribe_ok.subscribe_id = message.subscribe_id;
   bool success = stream_->Write(
       session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView());
   if (!success) {
@@ -524,62 +567,39 @@
   QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
                   << message.track_namespace << ":" << message.track_name;
   if (!end.has_value()) {
-    track.AddWindow(SubscribeWindow(start->group, start->object));
+    track.AddWindow(
+        SubscribeWindow(message.subscribe_id, start->group, start->object));
     return;
   }
-  track.AddWindow(
-      SubscribeWindow(start->group, start->object, end->group, end->object));
+  track.AddWindow(SubscribeWindow(message.subscribe_id, start->group,
+                                  start->object, end->group, end->object));
 }
 
 void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
   if (!CheckIfIsControlStream()) {
     return;
   }
-  if (session_->tracks_by_alias_.contains(message.track_id)) {
-    session_->Error(MoqtError::kDuplicateTrackAlias,
-                    "Received duplicate track_alias");
-    return;
-  }
-  auto it = session_->remote_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->remote_tracks_.end()) {
+  auto it = session_->active_subscribes_.find(message.subscribe_id);
+  if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SUBSCRIBE_OK for nonexistent subscribe");
     return;
   }
-  // Note that if there are multiple SUBSCRIBE_OK for the same track,
-  // RemoteTrack.track_alias() will be the last alias received, but
-  // tracks_by_alias_ will have an entry for every track_alias received.
-  // TODO: revise this data structure to make it easier to clean up
-  // RemoteTracks, unless draft changes make it irrelevant.
+  MoqtSubscribe& subscribe = it->second.message;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                  << message.track_namespace << ":" << message.track_name
-                  << ", track_alias = " << message.track_id;
-  RemoteTrack& track = it->second;
-  track.set_track_alias(message.track_id);
-  session_->tracks_by_alias_[message.track_id] = &track;
+                  << "subscribe_id = " << message.subscribe_id
+                  << subscribe.track_namespace << ":" << subscribe.track_name;
+  // Copy the Remote Track from session_->active_subscribes_ to
+  // session_->remote_tracks_.
+  FullTrackName ftn(subscribe.track_namespace, subscribe.track_name);
+  RemoteTrack::Visitor* visitor = it->second.visitor;
+  session_->remote_tracks_.try_emplace(subscribe.track_alias, ftn,
+                                       subscribe.track_alias, visitor);
   // TODO: handle expires.
-  if (track.visitor() != nullptr) {
-    track.visitor()->OnReply(track.full_track_name(), std::nullopt);
+  if (visitor != nullptr) {
+    visitor->OnReply(ftn, std::nullopt);
   }
-  // Clear the buffer for this track alias.
-  auto it2 = session_->object_queue_.find(message.track_id);
-  if (it2 == session_->object_queue_.end() || track.visitor() == nullptr) {
-    // Nothing is buffered, or the app hasn't registered a visitor anyway.
-    return;
-  }
-  QUIC_DLOG(INFO) << ENDPOINT << "Processing buffered OBJECTs for track_alias "
-                  << message.track_id;
-  std::vector<BufferedObject>& queue = it2->second;
-  for (BufferedObject& to_deliver : queue) {
-    track.visitor()->OnObjectFragment(
-        track.full_track_name(), to_deliver.stream_id,
-        to_deliver.message.group_sequence, to_deliver.message.object_sequence,
-        to_deliver.message.object_send_order, to_deliver.payload,
-        to_deliver.eom);
-    session_->num_buffered_objects_--;
-  }
-  session_->object_queue_.erase(it2);
+  session_->active_subscribes_.erase(it);
 }
 
 void MoqtSession::Stream::OnSubscribeErrorMessage(
@@ -587,21 +607,28 @@
   if (!CheckIfIsControlStream()) {
     return;
   }
-  auto it = session_->remote_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->remote_tracks_.end()) {
+  auto it = session_->active_subscribes_.find(message.subscribe_id);
+  if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SUBSCRIBE_ERROR for nonexistent subscribe");
     return;
   }
+  MoqtSubscribe& subscribe = it->second.message;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
-                  << message.track_namespace << ":" << message.track_name
-                  << ", error = " << static_cast<int>(message.error_code)
+                  << "subscribe_id = " << message.subscribe_id << " ("
+                  << subscribe.track_namespace << ":" << subscribe.track_name
+                  << ")" << ", error = " << static_cast<int>(message.error_code)
                   << " (" << message.reason_phrase << ")";
-  if (it->second.visitor() != nullptr) {
-    it->second.visitor()->OnReply(it->second.full_track_name(),
-                                  message.reason_phrase);
+  RemoteTrack::Visitor* visitor = it->second.visitor;
+  FullTrackName ftn(subscribe.track_namespace, subscribe.track_name);
+  if (message.error_code == SubscribeErrorCode::kRetryTrackAlias) {
+    // Automatically resubscribe with new alias.
+    session_->remote_track_aliases_[ftn] = message.track_alias;
+    session_->Subscribe(subscribe, visitor);
+  } else if (visitor != nullptr) {
+    visitor->OnReply(ftn, message.reason_phrase);
   }
+  session_->active_subscribes_.erase(it);
 }
 
 void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 01f6fcf..9b77a11 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -9,10 +9,9 @@
 #include <optional>
 #include <string>
 #include <utility>
-#include <vector>
 
 #include "absl/container/flat_hash_map.h"
-#include "absl/container/node_hash_map.h"
+#include "absl/container/flat_hash_set.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_framer.h"
@@ -79,8 +78,8 @@
 
   // Add to the list of tracks that can be subscribed to. Call this before
   // Announce() so that subscriptions can be processed correctly. If |visitor|
-  // is nullptr, then incoming SUBSCRIBE_REQUEST for objects in the path will
-  // receive SUBSCRIBE_OK, but never actually get the objects.
+  // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
+  // SUBSCRIBE_OK, but never actually get the objects.
   void AddLocalTrack(const FullTrackName& full_track_name,
                      LocalTrack::Visitor* visitor);
   // Send an ANNOUNCE message for |track_namespace|, and call
@@ -90,9 +89,9 @@
                 MoqtAnnounceCallback announce_callback);
   bool HasSubscribers(const FullTrackName& full_track_name) const;
 
-  // Returns true if SUBSCRIBE_REQUEST was sent. If there is already a
-  // subscription to the track, the message will still be sent. However, the
-  // visitor will be ignored.
+  // Returns true if SUBSCRIBE was sent. If there is already a subscription to
+  // the track, the message will still be sent. However, the visitor will be
+  // ignored.
   bool SubscribeAbsolute(absl::string_view track_namespace,
                          absl::string_view name, uint64_t start_group,
                          uint64_t start_object, RemoteTrack::Visitor* visitor,
@@ -111,15 +110,20 @@
                              RemoteTrack::Visitor* visitor,
                              absl::string_view auth_info = "");
 
-  // Returns the stream ID if successful, nullopt if not.
-  // TODO: Add a callback if stream creation is delayed.
-  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
-  // Will automatically assign a new sequence number. If |start_new_group|,
-  // increment group_sequence and set object_sequence to 0. Otherwise,
-  // increment object_sequence.
-  void PublishObjectToStream(webtransport::StreamId stream_id,
-                             FullTrackName full_track_name,
-                             bool start_new_group, absl::string_view payload);
+  // Returns false if it could not open a stream when necessary, or if the
+  // track does not exist (there was no call to AddLocalTrack). Will still
+  // return false is some streams succeed.
+  // Also returns false if |payload_length| exists but is shorter than
+  // |payload|.
+  // |payload.length() >= |payload_length|, because the application can deliver
+  // partial objects.
+  bool PublishObject(FullTrackName& full_track_name, uint64_t group_id,
+                     uint64_t object_id, uint64_t object_send_order,
+                     MoqtForwardingPreference forwarding_preference,
+                     absl::string_view payload,
+                     std::optional<uint64_t> payload_length,
+                     bool end_of_stream);
+  // TODO: Add an API to FIN the stream for a particular track/group/object.
 
  private:
   friend class test::MoqtSessionPeer;
@@ -145,12 +149,12 @@
     void OnWriteSideInDataRecvdState() override {}
 
     // MoqtParserVisitor implementation.
+    // TODO: Handle a stream FIN.
     void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
                          bool end_of_message) override;
     void OnClientSetupMessage(const MoqtClientSetup& message) override;
     void OnServerSetupMessage(const MoqtServerSetup& message) override;
-    void OnSubscribeRequestMessage(
-        const MoqtSubscribeRequest& message) override;
+    void OnSubscribeMessage(const MoqtSubscribe& message) override;
     void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
     void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
     void OnUnsubscribeMessage(const MoqtUnsubscribe& /*message*/) override {}
@@ -172,9 +176,10 @@
 
    private:
     friend class test::MoqtSessionPeer;
-    void SendSubscribeError(const MoqtSubscribeRequest& message,
+    void SendSubscribeError(const MoqtSubscribe& message,
                             SubscribeErrorCode error_code,
-                            absl::string_view reason_phrase);
+                            absl::string_view reason_phrase,
+                            uint64_t track_alias);
     bool CheckIfIsControlStream();
 
     MoqtSession* session_;
@@ -186,30 +191,16 @@
     std::string partial_object_;
   };
 
-  // If parameters_.deliver_partial_objects is false, then the session buffers
-  // these objects until they arrive in their entirety. This stores the
-  // relevant information to later deliver this object via OnObject().
-  struct BufferedObject {
-    uint32_t stream_id;
-    MoqtObject message;
-    std::string payload;
-    bool eom;
-    BufferedObject(uint32_t id, const MoqtObject& header,
-                   absl::string_view body, bool end_of_message)
-        : stream_id(id),
-          message(header),
-          payload(std::string(body)),
-          eom(end_of_message) {}
-  };
-
-  // Returns false if the SUBSCRIBE_REQUEST isn't sent.
-  bool Subscribe(const MoqtSubscribeRequest& message,
-                 RemoteTrack::Visitor* visitor);
+  // Returns false if the SUBSCRIBE isn't sent.
+  bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
   // converts two MoqtLocations into absolute sequences.
   std::optional<FullSequence> LocationToAbsoluteNumber(
       const LocalTrack& track,
       const std::optional<MoqtSubscribeLocation>& group,
       const std::optional<MoqtSubscribeLocation>& object);
+  // Returns the stream ID if successful, nullopt if not.
+  // TODO: Add a callback if stream creation is delayed.
+  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
 
   webtransport::Session* session_;
   MoqtSessionParameters parameters_;
@@ -221,18 +212,27 @@
   std::optional<webtransport::StreamId> control_stream_;
   std::string error_;
 
-  // All the tracks the session is subscribed to. Multiple subscribes to the
-  // same track are recorded in a single subscription.
-  absl::node_hash_map<FullTrackName, RemoteTrack> remote_tracks_;
+  // All the tracks the session is subscribed to, indexed by track_alias.
+  // Multiple subscribes to the same track are recorded in a single
+  // subscription.
+  absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
+  // Look up aliases for remote tracks by name
+  absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
+  uint64_t next_remote_track_alias_ = 0;
+
   // All the tracks the peer can subscribe to.
   absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
+  // This is only used to check for track_alias collisions.
+  absl::flat_hash_set<uint64_t> used_track_aliases_;
+  uint64_t next_local_track_alias_ = 0;
 
-  // Remote tracks indexed by TrackId. Must be active.
-  absl::flat_hash_map<uint64_t, RemoteTrack*> tracks_by_alias_;
-  uint64_t next_track_alias_ = 0;
-  // Buffer for OBJECTs that arrive with an unknown track alias.
-  absl::flat_hash_map<uint64_t, std::vector<BufferedObject>> object_queue_;
-  int num_buffered_objects_ = 0;
+  // Indexed by subscribe_id.
+  struct ActiveSubscribe {
+    MoqtSubscribe message;
+    RemoteTrack::Visitor* visitor;
+  };
+  absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
+  uint64_t next_subscribe_id_ = 0;
 
   // Indexed by track namespace.
   absl::flat_hash_map<std::string, MoqtAnnounceCallback>
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index eb231b2..69021e5 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -5,6 +5,7 @@
 #include "quiche/quic/moqt/moqt_session.h"
 
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <optional>
 #include <string>
@@ -41,7 +42,7 @@
 constexpr webtransport::StreamId kOutgoingUniStreamId = 14;
 
 constexpr MoqtSessionParameters default_parameters = {
-    /*version=*/MoqtVersion::kDraft01,
+    /*version=*/MoqtVersion::kDraft02,
     /*perspective=*/quic::Perspective::IS_CLIENT,
     /*using_webtrans=*/true,
     /*path=*/std::string(),
@@ -90,25 +91,30 @@
     return (MoqtSession::Stream*)visitor;
   }
 
-  static void CreateRemoteTrack(MoqtSession* session, FullTrackName& name,
-                                RemoteTrack::Visitor* visitor) {
-    session->remote_tracks_.try_emplace(name, name, visitor);
+  static void CreateRemoteTrack(MoqtSession* session, const FullTrackName& name,
+                                RemoteTrack::Visitor* visitor,
+                                uint64_t track_alias) {
+    session->remote_tracks_.try_emplace(track_alias, name, track_alias,
+                                        visitor);
+    session->remote_track_aliases_.try_emplace(name, track_alias);
   }
 
-  static void CreateRemoteTrackWithAlias(MoqtSession* session,
-                                         FullTrackName& name,
-                                         RemoteTrack::Visitor* visitor,
-                                         uint64_t track_alias) {
-    auto it = session->remote_tracks_.try_emplace(name, name, visitor);
-    RemoteTrack& track = it.first->second;
+  static void AddSubscription(MoqtSession* session, FullTrackName& name,
+                              uint64_t subscribe_id, uint64_t track_alias,
+                              uint64_t start_group, uint64_t start_object) {
+    auto it = session->local_tracks_.find(name);
+    ASSERT_NE(it, session->local_tracks_.end());
+    LocalTrack& track = it->second;
     track.set_track_alias(track_alias);
-    session->tracks_by_alias_.emplace(std::make_pair(track_alias, &track));
+    track.AddWindow(SubscribeWindow(subscribe_id, start_group, start_object));
+    session->used_track_aliases_.emplace(track_alias);
   }
 
-  static LocalTrack& GetLocalTrack(MoqtSession* session, FullTrackName& name) {
+  static FullSequence next_sequence(MoqtSession* session, FullTrackName& name) {
     auto it = session->local_tracks_.find(name);
     EXPECT_NE(it, session->local_tracks_.end());
-    return it->second;
+    LocalTrack& track = it->second;
+    return track.next_sequence();
   }
 };
 
@@ -117,6 +123,9 @@
   MoqtSessionTest()
       : session_(&mock_session_, default_parameters,
                  session_callbacks_.AsSessionCallbacks()) {}
+  ~MoqtSessionTest() {
+    EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
+  }
 
   MockSessionCallbacks session_callbacks_;
   StrictMock<webtransport::test::MockSession> mock_session_;
@@ -157,7 +166,7 @@
           &session_, visitor.get());
   // Handle the server setup
   MoqtServerSetup setup = {
-      MoqtVersion::kDraft01,
+      MoqtVersion::kDraft02,
       MoqtRole::kBoth,
   };
   EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
@@ -166,7 +175,7 @@
 
 TEST_F(MoqtSessionTest, OnClientSetup) {
   MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_SERVER,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -178,7 +187,7 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
   MoqtClientSetup setup = {
-      /*supported_versions*/ {MoqtVersion::kDraft01},
+      /*supported_versions=*/{MoqtVersion::kDraft02},
       /*role=*/MoqtRole::kBoth,
       /*path=*/std::nullopt,
   };
@@ -249,14 +258,18 @@
 }
 
 TEST_F(MoqtSessionTest, AddLocalTrack) {
-  MoqtSubscribeRequest request = {
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"bar",
       /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/std::nullopt,
+#endif
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -271,7 +284,7 @@
                   MoqtMessageType::kSubscribeError);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
 
   // Add the track. Now Subscribe should succeed.
@@ -285,7 +298,7 @@
         EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
 }
 
@@ -365,14 +378,18 @@
   EXPECT_FALSE(session_.HasSubscribers(ftn));
 
   // Peer subscribes.
-  MoqtSubscribeRequest request = {
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"bar",
       /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/std::nullopt,
+#endif
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -385,11 +402,50 @@
         EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(session_.HasSubscribers(ftn));
 }
 
+TEST_F(MoqtSessionTest, SubscribeForPast) {
+  MockLocalTrackVisitor local_track_visitor;
+  FullTrackName ftn("foo", "bar");
+  session_.AddLocalTrack(ftn, &local_track_visitor);
+
+  // Send Sequence (2, 0) so that next_sequence is set correctly.
+  session_.PublishObject(ftn, 2, 0, 0, MoqtForwardingPreference::kObject, "foo",
+                         std::nullopt, true);
+  // Peer subscribes to (0, 0)
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
+      /*authorization_info=*/std::nullopt,
+#endif
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  bool correct_message = true;
+  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
+        return absl::OkStatus();
+      });
+  stream_input->OnSubscribeMessage(request);
+  EXPECT_TRUE(correct_message);
+}
+
 TEST_F(MoqtSessionTest, SubscribeWithOk) {
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -401,16 +457,13 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeRequest);
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribe);
         return absl::OkStatus();
       });
   session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
 
   MoqtSubscribeOk ok = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*track_id=*/0,
+      /*subscribe_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
   };
   correct_message = false;
@@ -436,17 +489,16 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeRequest);
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribe);
         return absl::OkStatus();
       });
   session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
 
   MoqtSubscribeError error = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/0,
       /*error_code=*/SubscribeErrorCode::kInvalidRange,
       /*reason_phrase=*/"deadbeef",
+      /*track_alias=*/2,
   };
   correct_message = false;
   EXPECT_CALL(remote_track_visitor, OnReply(_, _))
@@ -483,12 +535,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -505,12 +559,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -526,7 +582,7 @@
 
 TEST_F(MoqtSessionTest, IncomingPartialObjectNoBuffer) {
   MoqtSessionParameters parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_CLIENT,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -537,12 +593,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -556,45 +614,20 @@
   object_stream->OnObjectMessage(object, payload, true);  // complete the object
 }
 
-TEST_F(MoqtSessionTest, IncomingObjectUnknownTrackId) {
-  MockRemoteTrackVisitor visitor_;
-  FullTrackName ftn("foo", "bar");
-  std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_);
-  MoqtObject object = {
-      /*track_id=*/0,
-      /*group_sequence=*/0,
-      /*object_sequence=*/0,
-      /*object_send_order=*/0,
-      /*payload_length=*/8,
-  };
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
-
-  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(0);
-  EXPECT_CALL(mock_stream, GetStreamId())
-      .WillRepeatedly(Return(kIncomingUniStreamId));
-  object_stream->OnObjectMessage(object, payload, true);
-  // Packet should be buffered.
-
-  // SUBSCRIBE_OK arrives
-  MoqtSubscribeOk ok = {
-      /*track_namespace=*/ftn.track_namespace,
-      /*track_name=*/ftn.track_name,
-      /*track_id=*/0,
-      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
-  };
-  StrictMock<webtransport::test::MockStream> mock_control_stream;
-  std::unique_ptr<MoqtParserVisitor> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
-  EXPECT_CALL(visitor_, OnReply(_, _)).Times(1);
-  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
-  control_stream->OnSubscribeOkMessage(ok);
-}
-
 TEST_F(MoqtSessionTest, CreateUniStreamAndSend) {
   StrictMock<webtransport::test::MockStream> mock_stream;
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor track_visitor;
+  session_.AddLocalTrack(ftn, &track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+  // No subscription; this is a no-op except to update next_sequence.
+  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
+  session_.PublishObject(ftn, 4, 1, 0, MoqtForwardingPreference::kObject,
+                         "deadbeef", std::nullopt, true);
+  EXPECT_EQ(MoqtSessionPeer::next_sequence(&session_, ftn), FullSequence(4, 2));
+
+  // Publish in window.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
@@ -602,79 +635,96 @@
   EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
-  std::optional<webtransport::StreamId> stream =
-      session_.OpenUnidirectionalStream();
-  EXPECT_TRUE(stream.has_value());
-  EXPECT_EQ(stream.value(), kOutgoingUniStreamId);
-
   // Send on the stream
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillOnce(Return(&mock_stream));
+  bool correct_message = false;
+  // Verify first six message fields are sent correctly
+  uint8_t kExpectedMessage[] = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = (0 == memcmp(data.data()->data(), kExpectedMessage,
+                                       sizeof(kExpectedMessage)));
+        return absl::OkStatus();
+      });
+  session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kObject,
+                         "deadbeef", std::nullopt, true);
+  EXPECT_TRUE(correct_message);
+}
+
+// TODO: Test operation with multiple streams.
+
+// Error cases
+
+TEST_F(MoqtSessionTest, CannotOpenUniStream) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
   MockLocalTrackVisitor track_visitor;
   session_.AddLocalTrack(ftn, &track_visitor);
-  LocalTrack& track = MoqtSessionPeer::GetLocalTrack(&session_, ftn);
-  FullSequence& next_seq = track.next_sequence_mutable();
-  next_seq.group = 4;
-  next_seq.object = 1;
-  track.AddWindow(SubscribeWindow(5, 0));
-  // No subscription; this is a no-op except for incrementing the sequence
-  // number.
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
-  session_.PublishObjectToStream(kOutgoingUniStreamId,
-                                 FullTrackName("foo", "bar"),
-                                 /*start_new_group=*/false, "deadbeef");
-  EXPECT_EQ(next_seq, FullSequence(4, 2));
-  bool correct_message = false;
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  ;
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false));
+  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0,
+                                      MoqtForwardingPreference::kObject,
+                                      "deadbeef", std::nullopt, true));
+}
+
+TEST_F(MoqtSessionTest, GetStreamByIdFails) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor track_visitor;
+  session_.AddLocalTrack(ftn, &track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kOutgoingUniStreamId));
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillOnce(Return(nullptr));
+  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0,
+                                      MoqtForwardingPreference::kObject,
+                                      "deadbeef", std::nullopt, true));
+}
+
+TEST_F(MoqtSessionTest, SubscribeProposesBadTrackAlias) {
+  MockLocalTrackVisitor local_track_visitor;
+  FullTrackName ftn("foo", "bar");
+  session_.AddLocalTrack(ftn, &local_track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+  // Peer subscribes.
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/3,  // Doesn't match 2.
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
+      /*authorization_info=*/std::nullopt,
+#endif
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  bool correct_message = true;
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
         EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kObjectWithPayloadLength);
+                  MoqtMessageType::kSubscribeError);
         return absl::OkStatus();
       });
-  session_.PublishObjectToStream(kOutgoingUniStreamId,
-                                 FullTrackName("foo", "bar"),
-                                 /*start_new_group=*/true, "deadbeef");
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
-  EXPECT_EQ(next_seq, FullSequence(5, 1));
-}
-
-// Error cases
-
-TEST_F(MoqtSessionTest, CannotOpenUniStream) {
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false));
-  std::optional<webtransport::StreamId> stream =
-      session_.OpenUnidirectionalStream();
-  EXPECT_FALSE(stream.has_value());
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(nullptr));
-  stream = session_.OpenUnidirectionalStream();
-  EXPECT_FALSE(stream.has_value());
-}
-
-TEST_F(MoqtSessionTest, CannotPublishToStream) {
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(nullptr));
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, &track_visitor);
-  LocalTrack& track = MoqtSessionPeer::GetLocalTrack(&session_, ftn);
-  FullSequence& next_seq = track.next_sequence_mutable();
-  next_seq.group = 4;
-  next_seq.object = 1;
-  session_.PublishObjectToStream(kOutgoingUniStreamId, ftn,
-                                 /*start_new_group=*/false, "deadbeef");
-  // Object not sent; no change in sequence number.
-  EXPECT_EQ(next_seq.group, 4);
-  EXPECT_EQ(next_seq.object, 1);
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -718,7 +768,7 @@
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamServer) {
   MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_SERVER,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -730,7 +780,7 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
   MoqtClientSetup setup = {
-      /*supported_versions*/ {MoqtVersion::kDraft01},
+      /*supported_versions*/ {MoqtVersion::kDraft02},
       /*role=*/MoqtRole::kBoth,
       /*path=*/std::nullopt,
   };
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
new file mode 100644
index 0000000..335d2d8
--- /dev/null
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -0,0 +1,107 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+
+#include <cstdint>
+#include <optional>
+#include <vector>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+bool SubscribeWindow::InWindow(const FullSequence& seq) const {
+  if (seq < start_) {
+    return false;
+  }
+  return (!end_.has_value() || seq <= *end_);
+}
+
+std::optional<webtransport::StreamId> SubscribeWindow::GetStreamForSequence(
+    FullSequence sequence,
+    MoqtForwardingPreference forwarding_preference) const {
+  if (forwarding_preference == MoqtForwardingPreference::kTrack) {
+    return track_stream_;
+  }
+  auto group_it = group_streams_.find(sequence.group);
+  if (group_it == group_streams_.end()) {
+    return std::nullopt;
+  }
+  if (forwarding_preference == MoqtForwardingPreference::kGroup) {
+    return group_it->second.group_stream;
+  }
+  auto object_it = group_it->second.object_streams.find(sequence.object);
+  if (object_it == group_it->second.object_streams.end()) {
+    return std::nullopt;
+  }
+  return object_it->second;
+}
+
+void SubscribeWindow::AddStream(MoqtForwardingPreference forwarding_preference,
+                                uint64_t group_id, uint64_t object_id,
+                                webtransport::StreamId stream_id) {
+  if (forwarding_preference == MoqtForwardingPreference::kTrack) {
+    QUIC_BUG_IF(quic_bug_moqt_draft_02_01, track_stream_.has_value())
+        << "Track stream already assigned";
+    track_stream_ = stream_id;
+    return;
+  }
+  if (forwarding_preference == MoqtForwardingPreference::kGroup) {
+    QUIC_BUG_IF(quic_bug_moqt_draft_02_02,
+                group_streams_[group_id].group_stream.has_value())
+        << "Group stream already assigned";
+    group_streams_[group_id].group_stream = stream_id;
+    return;
+  }
+  // ObjectStream or ObjectPreferDatagram
+  QUIC_BUG_IF(quic_bug_moqt_draft_02_03,
+              group_streams_[group_id].object_streams.contains(object_id))
+      << "Object stream already assigned";
+  group_streams_[group_id].object_streams[object_id] = stream_id;
+}
+
+void SubscribeWindow::RemoveStream(
+    MoqtForwardingPreference forwarding_preference, uint64_t group_id,
+    uint64_t object_id) {
+  if (forwarding_preference == moqt::MoqtForwardingPreference::kTrack) {
+    track_stream_ = std::nullopt;
+    return;
+  }
+  auto group_it = group_streams_.find(group_id);
+  if (group_it == group_streams_.end()) {
+    return;
+  }
+  GroupStreams& group = group_it->second;
+  if (forwarding_preference == moqt::MoqtForwardingPreference::kGroup) {
+    group.group_stream = std::nullopt;
+    if (group.object_streams.empty()) {
+      group_streams_.erase(group_id);
+    }
+    return;
+  }
+  // ObjectStream or ObjectPreferDatagram
+  if (group.object_streams.contains(object_id)) {
+    group_streams_[group_id].object_streams.erase(object_id);
+    if (!group.group_stream.has_value() &&
+        group_streams_[group_id].object_streams.empty()) {
+      group_streams_.erase(group_id);
+    }
+  }
+}
+
+std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
+    FullSequence sequence) {
+  std::vector<SubscribeWindow*> retval;
+  for (auto it = windows.begin(); it != windows.end(); it++) {
+    if (it->InWindow(sequence)) {
+      retval.push_back(&(*it));
+    }
+  }
+  return retval;
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 4bf5f76..f8def02 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -8,51 +8,72 @@
 #include <cstdint>
 #include <list>
 #include <optional>
+#include <vector>
 
+#include "absl/container/flat_hash_map.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
-struct SubscribeWindow {
-  FullSequence start;
-  std::optional<FullSequence> end;
+// Classes to track subscriptions to local tracks: the sequence numbers
+// subscribed, the streams involved, and the subscribe IDs.
+class QUICHE_EXPORT SubscribeWindow {
+ public:
   // Creates a half-open window.
-  SubscribeWindow(uint64_t start_group, uint64_t start_object) {
-    start = {start_group, start_object};
-    end = std::nullopt;
-  }
+  SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
+                  uint64_t start_object)
+      : subscribe_id_(subscribe_id), start_({start_group, start_object}) {}
+
   // Creates a closed window.
-  SubscribeWindow(uint64_t start_group, uint64_t start_object,
-                  uint64_t end_group, uint64_t end_object) {
-    start = {start_group, start_object};
-    end = {end_group, end_object};
-  }
-  bool InWindow(const FullSequence& seq) const {
-    if (seq < start) {
-      return false;
-    }
-    if (!end.has_value() || seq < *end) {
-      return true;
-    }
-    return false;
-  }
+  SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
+                  uint64_t start_object, uint64_t end_group,
+                  uint64_t end_object)
+      : subscribe_id_(subscribe_id),
+        start_({start_group, start_object}),
+        end_(FullSequence(end_group, end_object)) {}
+
+  uint64_t subscribe_id() const { return subscribe_id_; }
+
+  bool InWindow(const FullSequence& seq) const;
+
+  // Returns the stream to send |sequence| on, if already opened.
+  std::optional<webtransport::StreamId> GetStreamForSequence(
+      FullSequence sequence,
+      MoqtForwardingPreference forwarding_preference) const;
+
+  // Records what stream is being used for a track, group, or object depending
+  // on |forwarding_preference|. Triggers QUIC_BUG if already assigned.
+  void AddStream(MoqtForwardingPreference forwarding_preference,
+                 uint64_t group_id, uint64_t object_id,
+                 webtransport::StreamId stream_id);
+
+  void RemoveStream(MoqtForwardingPreference forwarding_preference,
+                    uint64_t group_id, uint64_t object_id);
+
+ private:
+  struct GroupStreams {
+    std::optional<webtransport::StreamId> group_stream;
+    absl::flat_hash_map<uint64_t, webtransport::StreamId> object_streams;
+  };
+  const uint64_t subscribe_id_;
+  const FullSequence start_;
+  const std::optional<FullSequence> end_ = std::nullopt;
+  // Open streams for this subscription
+  std::optional<webtransport::StreamId> track_stream_;
+  absl::flat_hash_map<uint64_t, GroupStreams> group_streams_;
 };
 
 // Class to keep track of the sequence number blocks to which a peer is
 // subscribed.
-class MoqtSubscribeWindows {
+class QUICHE_EXPORT MoqtSubscribeWindows {
  public:
   MoqtSubscribeWindows() {}
 
-  bool SequenceIsSubscribed(uint64_t group, uint64_t object) const {
-    FullSequence seq(group, object);
-    for (auto it : windows) {
-      if (it.InWindow(seq)) {
-        return true;
-      }
-    }
-    return false;
-  }
+  // Returns a vector of subscribe IDs that apply to the object. They will be in
+  // reverse order of the AddWindow calls.
+  std::vector<SubscribeWindow*> SequenceIsSubscribed(FullSequence sequence);
 
   // |window| has already been converted into absolute sequence numbers. An
   // optimization could consolidate overlapping subscribe windows.
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index b1b1bd0..02e7007 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -4,35 +4,166 @@
 
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 
+#include <optional>
+
+#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
 namespace test {
 
-class MoqtSubscribeWindowsTest : public quic::test::QuicTest {
+class QUICHE_EXPORT SubscribeWindowTest : public quic::test::QuicTest {
+ public:
+  SubscribeWindowTest()
+      : window_(/*subscribe_id=*/2, /*start_group=*/4,
+                /*start_object=*/0, /*end_group=*/5,
+                /*end_object=*/1) {}
+
+  SubscribeWindow window_;
+};
+
+TEST_F(SubscribeWindowTest, Queries) {
+  EXPECT_EQ(window_.subscribe_id(), 2);
+  EXPECT_TRUE(window_.InWindow(FullSequence(4, 0)));
+  EXPECT_TRUE(window_.InWindow(FullSequence(5, 1)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(5, 2)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(6, 0)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(3, 12)));
+}
+
+TEST_F(SubscribeWindowTest, AddRemoveStream) {
+  window_.AddStream(MoqtForwardingPreference::kTrack, 4, 0, 2);
+  window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
+  window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
+  window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
+  // This is a no-op; the stream does not exist.
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 6, 0);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kGroup),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            10);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kDatagram),
+            10);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kObject),
+            14);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kDatagram),
+            14);
+
+  window_.RemoveStream(MoqtForwardingPreference::kTrack, 4, 0);
+  window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 1);
+  // kObject and kDatagram are interchangeable
+  window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 2);
+  // The two commands above should not have deleted the group stream.
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kTrack),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+}
+
+TEST_F(SubscribeWindowTest, RemoveGroupBeforeObjects) {
+  window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
+  window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
+  window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
+  // Object stream is not deleted when the root group stream is.
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            10);
+  EXPECT_FALSE(window_
+                   .GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup)
+                   .has_value());
+}
+
+class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest {
  public:
   MoqtSubscribeWindows windows_;
 };
 
 TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
   EXPECT_TRUE(windows_.IsEmpty());
-  windows_.AddWindow(SubscribeWindow(1, 3));
+  windows_.AddWindow(SubscribeWindow(0, 1, 3));
   EXPECT_FALSE(windows_.IsEmpty());
 }
 
 TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
   EXPECT_TRUE(windows_.IsEmpty());
   // The first two windows overlap; the third is open-ended.
-  windows_.AddWindow(SubscribeWindow(1, 0, 3, 9));
-  windows_.AddWindow(SubscribeWindow(2, 4, 4, 3));
-  windows_.AddWindow(SubscribeWindow(10, 0));
+  windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
+  windows_.AddWindow(SubscribeWindow(1, 2, 4, 4, 3));
+  windows_.AddWindow(SubscribeWindow(2, 10, 0));
   EXPECT_FALSE(windows_.IsEmpty());
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(0, 8));
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(1, 0));
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(4, 4));
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(8, 3));
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(100, 7));
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty());
+  auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0));
+  EXPECT_EQ(hits.size(), 1);
+  EXPECT_EQ(hits[0]->subscribe_id(), 0);
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(4, 4)).empty());
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(8, 3)).empty());
+  hits = windows_.SequenceIsSubscribed(FullSequence(100, 7));
+  EXPECT_EQ(hits.size(), 1);
+  EXPECT_EQ(hits[0]->subscribe_id(), 2);
+  hits = windows_.SequenceIsSubscribed(FullSequence(3, 0));
+  EXPECT_EQ(hits.size(), 2);
+  EXPECT_EQ(hits[0]->subscribe_id(), 1);
+  EXPECT_EQ(hits[1]->subscribe_id(), 0);
 }
 
 }  // namespace test
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 4dad086..4d04afc 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -7,6 +7,7 @@
 
 #include <cstdint>
 #include <optional>
+#include <vector>
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
@@ -27,32 +28,31 @@
     // the session will send SUBSCRIBE_OK. If the return has a value, the value
     // is the error message (the session will send SUBSCRIBE_ERROR). Via this
     // API, the application decides if a partially fulfillable
-    // SUBSCRIBE_REQUEST results in an error or not.
-    virtual std::optional<absl::string_view> OnSubscribeRequestForPast(
+    // SUBSCRIBE results in an error or not.
+    virtual std::optional<absl::string_view> OnSubscribeForPast(
         const SubscribeWindow& window) = 0;
   };
   // |visitor| must not be nullptr.
-  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
-             Visitor* visitor)
-      : full_track_name_(full_track_name),
-        track_alias_(track_alias),
-        visitor_(visitor) {}
+  LocalTrack(const FullTrackName& full_track_name, Visitor* visitor)
+      : full_track_name_(full_track_name), visitor_(visitor) {}
   // Creates a LocalTrack that does not start at sequence (0,0)
-  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
-             Visitor* visitor, FullSequence next_sequence)
+  LocalTrack(const FullTrackName& full_track_name, Visitor* visitor,
+             FullSequence next_sequence)
       : full_track_name_(full_track_name),
-        track_alias_(track_alias),
         next_sequence_(next_sequence),
         visitor_(visitor) {}
 
   const FullTrackName& full_track_name() const { return full_track_name_; }
 
-  uint64_t track_alias() const { return track_alias_; }
+  std::optional<uint64_t> track_alias() const { return track_alias_; }
+  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
 
   Visitor* visitor() { return visitor_; }
 
-  bool ShouldSend(uint64_t group, uint64_t object) const {
-    return windows_.SequenceIsSubscribed(group, object);
+  // Returns the subscribe windows that want the object defined by (|group|,
+  // |object|).
+  std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) {
+    return windows_.SequenceIsSubscribed(sequence);
   }
 
   void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
@@ -61,7 +61,12 @@
   // by one.
   const FullSequence& next_sequence() const { return next_sequence_; }
 
-  FullSequence& next_sequence_mutable() { return next_sequence_; }
+  // Updates next_sequence_ if |sequence| is larger.
+  void SentSequence(FullSequence sequence) {
+    if (next_sequence_ <= sequence) {
+      next_sequence_ = {sequence.group, sequence.object + 1};
+    }
+  }
 
   bool HasSubscriber() const { return !windows_.IsEmpty(); }
 
@@ -69,9 +74,12 @@
   // This only needs to track subscriptions to current and future objects;
   // requests for objects in the past are forwarded to the application.
   const FullTrackName full_track_name_;
-  const uint64_t track_alias_;
+  // Let the first SUBSCRIBE determine the track alias.
+  std::optional<uint64_t> track_alias_;
   // The sequence numbers from this track to which the peer is subscribed.
   MoqtSubscribeWindows windows_;
+  // By recording the highest observed sequence number, MoQT can interpret
+  // relative sequence numbers in SUBSCRIBEs.
   FullSequence next_sequence_ = {0, 0};
   Visitor* visitor_;
 };
@@ -82,34 +90,36 @@
   class Visitor {
    public:
     virtual ~Visitor() = default;
-    // Called when the session receives a response to the SUBSCRIBE_REQUEST.
+    // Called when the session receives a response to the SUBSCRIBE, unless it's
+    // a SUBSCRIBE_ERROR with a new track_alias. In that case, the session will
+    // automatically retry.
     virtual void OnReply(
         const FullTrackName& full_track_name,
         std::optional<absl::string_view> error_reason_phrase) = 0;
-    virtual void OnObjectFragment(const FullTrackName& full_track_name,
-                                  uint32_t stream_id, uint64_t group_sequence,
-                                  uint64_t object_sequence,
-                                  uint64_t object_send_order,
-                                  absl::string_view object,
-                                  bool end_of_message) = 0;
+    virtual void OnObjectFragment(
+        const FullTrackName& full_track_name, uint64_t group_sequence,
+        uint64_t object_sequence, uint64_t object_send_order,
+        MoqtForwardingPreference forwarding_preference,
+        absl::string_view object, bool end_of_message) = 0;
     // TODO(martinduke): Add final sequence numbers
   };
-  RemoteTrack(const FullTrackName& full_track_name, Visitor* visitor)
-      : full_track_name_(full_track_name), visitor_(visitor) {}
+  RemoteTrack(const FullTrackName& full_track_name, uint64_t track_alias,
+              Visitor* visitor)
+      : full_track_name_(full_track_name),
+        track_alias_(track_alias),
+        visitor_(visitor) {}
 
   const FullTrackName& full_track_name() { return full_track_name_; }
 
-  std::optional<uint64_t> track_alias() const { return track_alias_; }
+  uint64_t track_alias() const { return track_alias_; }
 
   Visitor* visitor() { return visitor_; }
 
-  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
-
  private:
   // TODO: There is no accounting for the number of outstanding subscribes,
   // because we can't match track names to individual subscribes.
-  FullTrackName full_track_name_;
-  std::optional<uint64_t> track_alias_;
+  const FullTrackName full_track_name_;
+  const uint64_t track_alias_;
   Visitor* visitor_;
 };
 
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index a4d3e2a..4d85b6b 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -4,7 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
-#include <cstdint>
 #include <optional>
 
 #include "absl/strings/string_view.h"
@@ -20,49 +19,53 @@
 class LocalTrackTest : public quic::test::QuicTest {
  public:
   LocalTrackTest()
-      : track_(FullTrackName("foo", "bar"), /*track_alias=*/5, &visitor_,
-               FullSequence(4, 1)) {}
+      : track_(FullTrackName("foo", "bar"), &visitor_, FullSequence(4, 1)) {}
   LocalTrack track_;
   MockLocalTrackVisitor visitor_;
 };
 
 TEST_F(LocalTrackTest, Queries) {
   EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), 5);
+  EXPECT_EQ(track_.track_alias(), std::nullopt);
   EXPECT_EQ(track_.visitor(), &visitor_);
   EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));
-  FullSequence& mutable_next = track_.next_sequence_mutable();
-  mutable_next.object++;
+  track_.SentSequence(FullSequence(4, 0));
+  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));  // no change
+  track_.SentSequence(FullSequence(4, 1));
   EXPECT_EQ(track_.next_sequence(), FullSequence(4, 2));
   EXPECT_FALSE(track_.HasSubscriber());
 }
 
-TEST_F(LocalTrackTest, AfterSubscribe) {
-  track_.AddWindow(SubscribeWindow(4, 1));
+TEST_F(LocalTrackTest, SetTrackAlias) {
+  EXPECT_EQ(track_.track_alias(), std::nullopt);
+  track_.set_track_alias(6);
+  EXPECT_EQ(track_.track_alias(), 6);
+}
+
+TEST_F(LocalTrackTest, ShouldSend) {
+  track_.AddWindow(SubscribeWindow(0, 4, 1));
   EXPECT_TRUE(track_.HasSubscriber());
-  EXPECT_FALSE(track_.ShouldSend(3, 12));
-  EXPECT_FALSE(track_.ShouldSend(4, 0));
-  EXPECT_TRUE(track_.ShouldSend(4, 1));
-  EXPECT_TRUE(track_.ShouldSend(12, 0));
+  EXPECT_TRUE(track_.ShouldSend(FullSequence(3, 12)).empty());
+  EXPECT_TRUE(track_.ShouldSend(FullSequence(4, 0)).empty());
+  EXPECT_EQ(track_.ShouldSend(FullSequence(4, 1)).size(), 1);
+  EXPECT_EQ(track_.ShouldSend(FullSequence(12, 0)).size(), 1);
 }
 
 class RemoteTrackTest : public quic::test::QuicTest {
  public:
-  RemoteTrackTest() : track_(FullTrackName("foo", "bar"), &visitor_) {}
+  RemoteTrackTest()
+      : track_(FullTrackName("foo", "bar"), /*track_alias=*/5, &visitor_) {}
   RemoteTrack track_;
   MockRemoteTrackVisitor visitor_;
 };
 
 TEST_F(RemoteTrackTest, Queries) {
   EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
+  EXPECT_EQ(track_.track_alias(), 5);
   EXPECT_EQ(track_.visitor(), &visitor_);
 }
 
-TEST_F(RemoteTrackTest, SetAlias) {
-  track_.set_track_alias(5);
-  EXPECT_EQ(track_.track_alias(), 5);
-}
+// TODO: Write test for GetStreamForSequence.
 
 }  // namespace test
 
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 09ebb64..dae15e0 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -8,6 +8,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <memory>
 #include <optional>
 #include <vector>
 
@@ -34,10 +35,10 @@
   MoqtMessageType message_type() const { return message_type_; }
 
   typedef absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject,
-                        MoqtSubscribeRequest, MoqtSubscribeOk,
-                        MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeFin,
-                        MoqtSubscribeRst, MoqtAnnounce, MoqtAnnounceOk,
-                        MoqtAnnounceError, MoqtUnannounce, MoqtGoAway>
+                        MoqtSubscribe, MoqtSubscribeOk, MoqtSubscribeError,
+                        MoqtUnsubscribe, MoqtSubscribeFin, MoqtSubscribeRst,
+                        MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError,
+                        MoqtUnannounce, MoqtGoAway>
       MessageStructuredData;
 
   // The total actual size of the message.
@@ -112,19 +113,25 @@
 // Base class for the two subtypes of Object Message.
 class QUICHE_NO_EXPORT ObjectMessage : public TestMessageBase {
  public:
-  ObjectMessage(MoqtMessageType type) : TestMessageBase(type) {}
+  ObjectMessage(MoqtMessageType type) : TestMessageBase(type) {
+    object_.forwarding_preference = GetForwardingPreference(type);
+  }
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtObject>(values);
-    if (cast.track_id != object_.track_id) {
+    if (cast.subscribe_id != object_.subscribe_id) {
       QUIC_LOG(INFO) << "OBJECT Track ID mismatch";
       return false;
     }
-    if (cast.group_sequence != object_.group_sequence) {
+    if (cast.track_alias != object_.track_alias) {
+      QUIC_LOG(INFO) << "OBJECT Track ID mismatch";
+      return false;
+    }
+    if (cast.group_id != object_.group_id) {
       QUIC_LOG(INFO) << "OBJECT Group Sequence mismatch";
       return false;
     }
-    if (cast.object_sequence != object_.object_sequence) {
+    if (cast.object_id != object_.object_id) {
       QUIC_LOG(INFO) << "OBJECT Object Sequence mismatch";
       return false;
     }
@@ -132,88 +139,155 @@
       QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
       return false;
     }
+    if (cast.forwarding_preference != object_.forwarding_preference) {
+      QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
+      return false;
+    }
+    if (cast.payload_length != object_.payload_length) {
+      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
+      return false;
+    }
     return true;
   }
 
-  void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
-  }
-
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(object_);
   }
 
  protected:
   MoqtObject object_ = {
-      /*track_id=*/4,
-      /*group_sequence=*/5,
-      /*object_sequence=*/6,
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id*/ 5,
+      /*object_id=*/6,
       /*object_send_order=*/7,
+      /*forwarding_preference=*/MoqtForwardingPreference::kTrack,
       /*payload_length=*/std::nullopt,
   };
 };
 
-class QUICHE_NO_EXPORT ObjectMessageWithLength : public ObjectMessage {
+class QUICHE_NO_EXPORT ObjectStreamMessage : public ObjectMessage {
  public:
-  ObjectMessageWithLength()
-      : ObjectMessage(MoqtMessageType::kObjectWithPayloadLength) {
+  ObjectStreamMessage() : ObjectMessage(MoqtMessageType::kObjectStream) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.payload_length = payload_length_;
-  }
-
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtObject>(values);
-    if (cast.payload_length != payload_length_) {
-      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
-      return false;
-    }
-    return ObjectMessage::EqualFieldValues(values);
+    object_.forwarding_preference = MoqtForwardingPreference::kObject;
   }
 
   void ExpandVarints() override {
     ExpandVarintsImpl("vvvvvv");  // first six fields are varints
   }
 
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(object_);
+ private:
+  uint8_t raw_packet_[9] = {
+      0x00, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
+      0x66, 0x6f, 0x6f,                    // payload = "foo"
+  };
+};
+
+class QUICHE_NO_EXPORT ObjectPreferDatagramMessage : public ObjectMessage {
+ public:
+  ObjectPreferDatagramMessage()
+      : ObjectMessage(MoqtMessageType::kObjectPreferDatagram) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  }
+
+  void ExpandVarints() override {
+    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
   }
 
  private:
   uint8_t raw_packet_[9] = {
-      0x00, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x03, 0x66, 0x6f, 0x6f,        // payload = "foo"
+      0x01, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
+      0x66, 0x6f, 0x6f,                    // payload = "foo"
   };
-  std::optional<uint64_t> payload_length_ = 3;
 };
 
-class QUICHE_NO_EXPORT ObjectMessageWithoutLength : public ObjectMessage {
+// Concatentation of the base header and the object-specific header. Follow-on
+// object headers are handled in a different class.
+class QUICHE_NO_EXPORT StreamHeaderTrackMessage : public ObjectMessage {
  public:
-  ObjectMessageWithoutLength()
-      : ObjectMessage(MoqtMessageType::kObjectWithoutPayloadLength) {
+  StreamHeaderTrackMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderTrack) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-  }
-
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtObject>(values);
-    if (cast.payload_length != std::nullopt) {
-      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
-      return false;
-    }
-    return ObjectMessage::EqualFieldValues(values);
+    object_.forwarding_preference = MoqtForwardingPreference::kTrack;
+    object_.payload_length = 3;
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvv");  // first six fields are varints
-  }
-
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(object_);
+    ExpandVarintsImpl("--vvvvvv");  // six one-byte varints
   }
 
  private:
-  uint8_t raw_packet_[8] = {
-      0x02, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x66, 0x6f, 0x6f,              // payload = "foo"
+  // Some tests check that a FIN sent at the halfway point of a message results
+  // in an error. Without the unnecessary expanded varint 0x0405, the halfway
+  // point falls at the end of the Stream Header, which is legal. Expand the
+  // varint so that the FIN would be illegal.
+  uint8_t raw_packet_[11] = {
+      0x40, 0x50,              // two byte type field
+      0x03, 0x04, 0x07,        // varints
+      0x05, 0x06,              // object middler
+      0x03, 0x66, 0x6f, 0x6f,  // payload = "foo"
+  };
+};
+
+// Used only for tests that process multiple objects on one stream.
+class QUICHE_NO_EXPORT StreamMiddlerTrackMessage : public ObjectMessage {
+ public:
+  StreamMiddlerTrackMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderTrack) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kTrack;
+    object_.payload_length = 3;
+    object_.group_id = 9;
+    object_.object_id = 10;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ private:
+  uint8_t raw_packet_[6] = {
+      0x09, 0x0a, 0x03, 0x62, 0x61, 0x72,  // object middler; payload = "bar"
+  };
+};
+
+class QUICHE_NO_EXPORT StreamHeaderGroupMessage : public ObjectMessage {
+ public:
+  StreamHeaderGroupMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderGroup) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
+    object_.payload_length = 3;
+  }
+
+  void ExpandVarints() override {
+    ExpandVarintsImpl("--vvvvvv");  // six one-byte varints
+  }
+
+ private:
+  uint8_t raw_packet_[11] = {
+      0x40, 0x51,                    // two-byte type field
+      0x03, 0x04, 0x05, 0x07,        // varints
+      0x06, 0x03, 0x66, 0x6f, 0x6f,  // object middler; payload = "foo"
+  };
+};
+
+// Used only for tests that process multiple objects on one stream.
+class QUICHE_NO_EXPORT StreamMiddlerGroupMessage : public ObjectMessage {
+ public:
+  StreamMiddlerGroupMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderGroup) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
+    object_.payload_length = 3;
+    object_.object_id = 9;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ private:
+  uint8_t raw_packet_[5] = {
+      0x09, 0x03, 0x62, 0x61, 0x72,  // object middler; payload = "bar"
   };
 };
 
@@ -325,72 +399,115 @@
   };
 };
 
-class QUICHE_NO_EXPORT SubscribeRequestMessage : public TestMessageBase {
+class QUICHE_NO_EXPORT SubscribeMessage : public TestMessageBase {
  public:
-  SubscribeRequestMessage()
-      : TestMessageBase(MoqtMessageType::kSubscribeRequest) {
+  SubscribeMessage() : TestMessageBase(MoqtMessageType::kSubscribe) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
   }
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtSubscribeRequest>(values);
+    auto cast = std::get<MoqtSubscribe>(values);
+    if (cast.subscribe_id != subscribe_request_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch";
+      return false;
+    }
+    if (cast.track_alias != subscribe_request_.track_alias) {
+      QUIC_LOG(INFO) << "SUBSCRIBE track alias mismatch";
+      return false;
+    }
     if (cast.track_namespace != subscribe_request_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST track namespace mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE track namespace mismatch";
       return false;
     }
     if (cast.track_name != subscribe_request_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST track name mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE track name mismatch";
       return false;
     }
     if (cast.start_group != subscribe_request_.start_group) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST start group mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE start group mismatch";
       return false;
     }
     if (cast.start_object != subscribe_request_.start_object) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST start object mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE start object mismatch";
       return false;
     }
     if (cast.end_group != subscribe_request_.end_group) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST end group mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE end group mismatch";
       return false;
     }
     if (cast.end_object != subscribe_request_.end_object) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST end object mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE end object mismatch";
       return false;
     }
+#ifdef MOQT_AUTH_INFO
     if (cast.authorization_info != subscribe_request_.authorization_info) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST authorization info mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE authorization info mismatch";
       return false;
     }
+#endif
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v----vvvvvvvvv---"); }
+  void ExpandVarints() override {
+#ifdef MOQT_AUTH_INFO
+    ExpandVarintsImpl("vvvv---v----vvvvvvvvv");
+#else
+    ExpandVarintsImpl("vvvv---v----vvvvvv");
+#endif
+  }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_request_);
   }
 
  private:
-  uint8_t raw_packet_[22] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x02, 0x04,                    // start_group = 4 (relative previous)
-      0x01, 0x01,                    // start_object = 1 (absolute)
-      0x00,                          // end_group = none
-      0x00,                          // end_object = none
-      0x01,                          // 1 parameter
-      0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#ifdef MOQT_AUTH_INFO
+  uint8_t raw_packet_[24] = {
+#else
+  uint8_t raw_packet_[18] = {
+#endif
+    0x03,
+    0x01,
+    0x02,  // id and alias
+    0x03,
+    0x66,
+    0x6f,
+    0x6f,  // track_namespace = "foo"
+    0x04,
+    0x61,
+    0x62,
+    0x63,
+    0x64,  // track_name = "abcd"
+    0x02,
+    0x04,  // start_group = 4 (relative previous)
+    0x01,
+    0x01,  // start_object = 1 (absolute)
+    0x00,  // end_group = none
+    0x00,  // end_object = none
+           // TODO(martinduke): figure out what to do about the missing num
+           // parameters field.
+#ifdef MOQT_AUTH_INFO
+    0x01,  // 1 parameter
+    0x02,
+    0x03,
+    0x62,
+    0x61,
+    0x72,  // authorization_info = "bar"
+#endif
   };
 
-  MoqtSubscribeRequest subscribe_request_ = {
+  MoqtSubscribe subscribe_request_ = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"abcd",
       /*start_group=*/MoqtSubscribeLocation(false, (int64_t)(-4)),
       /*start_object=*/MoqtSubscribeLocation(true, (uint64_t)1),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/"bar",
+#endif
   };
 };
 
@@ -402,16 +519,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeOk>(values);
-    if (cast.track_namespace != subscribe_ok_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track namespace mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_ok_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track name mismatch";
-      return false;
-    }
-    if (cast.track_id != subscribe_ok_.track_id) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track ID mismatch";
+    if (cast.subscribe_id != subscribe_ok_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE OK subscribe ID mismatch";
       return false;
     }
     if (cast.expires != subscribe_ok_.expires) {
@@ -421,25 +530,20 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_ok_);
   }
 
  private:
-  uint8_t raw_packet_[11] = {
-      0x04, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x01,                          // track_id = 1
-      0x02,                          // expires = 2
+  uint8_t raw_packet_[3] = {
+      0x04, 0x01, 0x03,  // subscribe_id = 1, expires = 3
   };
 
   MoqtSubscribeOk subscribe_ok_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*track_id=*/1,
-      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(2),
+      /*subscribe_id=*/1,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
   };
 };
 
@@ -451,12 +555,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeError>(values);
-    if (cast.track_namespace != subscribe_error_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track namespace mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_error_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track name mismatch";
+    if (cast.subscribe_id != subscribe_error_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE ERROR subscribe_id mismatch";
       return false;
     }
     if (cast.error_code != subscribe_error_.error_code) {
@@ -467,28 +567,32 @@
       QUIC_LOG(INFO) << "SUBSCRIBE ERROR reason phrase mismatch";
       return false;
     }
+    if (cast.track_alias != subscribe_error_.track_alias) {
+      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track alias mismatch";
+      return false;
+    }
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv---"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv---v"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_error_);
   }
 
  private:
-  uint8_t raw_packet_[14] = {
-      0x05, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x01,                          // error_code = 1
-      0x03, 0x62, 0x61, 0x72,        // reason_phrase = "bar"
+  uint8_t raw_packet_[8] = {
+      0x05, 0x02,              // subscribe_id = 2
+      0x01,                    // error_code = 2
+      0x03, 0x62, 0x61, 0x72,  // reason_phrase = "bar"
+      0x04,                    // track_alias = 4
   };
 
   MoqtSubscribeError subscribe_error_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/2,
       /*subscribe=*/SubscribeErrorCode::kInvalidRange,
       /*reason_phrase=*/"bar",
+      /*track_alias=*/4,
   };
 };
 
@@ -500,32 +604,26 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtUnsubscribe>(values);
-    if (cast.track_namespace != unsubscribe_.track_namespace) {
-      QUIC_LOG(INFO) << "UNSUBSCRIBE track name mismatch";
-      return false;
-    }
-    if (cast.track_name != unsubscribe_.track_name) {
-      QUIC_LOG(INFO) << "UNSUBSCRIBE track name mismatch";
+    if (cast.subscribe_id != unsubscribe_.subscribe_id) {
+      QUIC_LOG(INFO) << "UNSUBSCRIBE subscribe ID mismatch";
       return false;
     }
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(unsubscribe_);
   }
 
  private:
-  uint8_t raw_packet_[9] = {
-      0x0a, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
+  uint8_t raw_packet_[2] = {
+      0x0a, 0x03,  // subscribe_id = 3
   };
 
   MoqtUnsubscribe unsubscribe_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/3,
   };
 };
 
@@ -537,12 +635,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeFin>(values);
-    if (cast.track_namespace != subscribe_fin_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_FIN track name mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_fin_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_FIN track name mismatch";
+    if (cast.subscribe_id != subscribe_fin_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_FIN subscribe ID mismatch";
       return false;
     }
     if (cast.final_group != subscribe_fin_.final_group) {
@@ -556,23 +650,21 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_fin_);
   }
 
  private:
-  uint8_t raw_packet_[11] = {
-      0x0b, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x08,                          // final_group = 8
-      0x0c,                          // final_object = 12
+  uint8_t raw_packet_[4] = {
+      0x0b, 0x03,  // subscribe_id = 3
+      0x08,        // final_group = 8
+      0x0c,        // final_object = 12
   };
 
   MoqtSubscribeFin subscribe_fin_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/3,
       /*final_group=*/8,
       /*final_object=*/12,
   };
@@ -586,12 +678,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeRst>(values);
-    if (cast.track_namespace != subscribe_rst_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_RST track name mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_rst_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_RST track name mismatch";
+    if (cast.subscribe_id != subscribe_rst_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_RST subscribe ID mismatch";
       return false;
     }
     if (cast.error_code != subscribe_rst_.error_code) {
@@ -613,25 +701,23 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv--vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv--vv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_rst_);
   }
 
  private:
-  uint8_t raw_packet_[15] = {
-      0x0c, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x03,                          // error_code = 3
-      0x02, 0x68, 0x69,              // reason_phrase = "hi"
-      0x08,                          // final_group = 8
-      0x0c,                          // final_object = 12
+  uint8_t raw_packet_[8] = {
+      0x0c, 0x02,        // subscribe_id = 2
+      0x03,              // error_code = 3
+      0x02, 0x68, 0x69,  // reason_phrase = "hi"
+      0x08,              // final_group = 8
+      0x0c,              // final_object = 12
   };
 
   MoqtSubscribeRst subscribe_rst_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/2,
       /*error_code=*/3,
       /*reason_phrase=*/"hi",
       /*final_group=*/8,
@@ -813,6 +899,49 @@
   };
 };
 
+// Factory function for test messages.
+static inline std::unique_ptr<TestMessageBase> CreateTestMessage(
+    MoqtMessageType message_type, bool is_webtrans) {
+  switch (message_type) {
+    case MoqtMessageType::kObjectStream:
+      return std::make_unique<ObjectStreamMessage>();
+    case MoqtMessageType::kObjectPreferDatagram:
+      return std::make_unique<ObjectPreferDatagramMessage>();
+    case MoqtMessageType::kSubscribe:
+      return std::make_unique<SubscribeMessage>();
+    case MoqtMessageType::kSubscribeOk:
+      return std::make_unique<SubscribeOkMessage>();
+    case MoqtMessageType::kSubscribeError:
+      return std::make_unique<SubscribeErrorMessage>();
+    case MoqtMessageType::kUnsubscribe:
+      return std::make_unique<UnsubscribeMessage>();
+    case MoqtMessageType::kSubscribeFin:
+      return std::make_unique<SubscribeFinMessage>();
+    case MoqtMessageType::kSubscribeRst:
+      return std::make_unique<SubscribeRstMessage>();
+    case MoqtMessageType::kAnnounce:
+      return std::make_unique<AnnounceMessage>();
+    case MoqtMessageType::kAnnounceOk:
+      return std::make_unique<AnnounceOkMessage>();
+    case MoqtMessageType::kAnnounceError:
+      return std::make_unique<AnnounceErrorMessage>();
+    case MoqtMessageType::kUnannounce:
+      return std::make_unique<UnannounceMessage>();
+    case MoqtMessageType::kGoAway:
+      return std::make_unique<GoAwayMessage>();
+    case MoqtMessageType::kClientSetup:
+      return std::make_unique<ClientSetupMessage>(is_webtrans);
+    case MoqtMessageType::kServerSetup:
+      return std::make_unique<ServerSetupMessage>();
+    case MoqtMessageType::kStreamHeaderTrack:
+      return std::make_unique<StreamHeaderTrackMessage>();
+    case MoqtMessageType::kStreamHeaderGroup:
+      return std::make_unique<StreamHeaderGroupMessage>();
+    default:
+      return nullptr;
+  }
+}
+
 }  // namespace moqt::test
 
 #endif  // QUICHE_QUIC_MOQT_TEST_TOOLS_MOQT_TEST_MESSAGE_H_
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index f2af2f7..c5cb070 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -107,12 +107,11 @@
       }
     }
 
-    void OnObjectFragment(const moqt::FullTrackName& full_track_name,
-                          uint32_t /*stream_id*/, uint64_t group_sequence,
-                          uint64_t object_sequence,
-                          uint64_t /*object_send_order*/,
-                          absl::string_view object,
-                          bool end_of_message) override {
+    void OnObjectFragment(
+        const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
+        uint64_t object_sequence, uint64_t /*object_send_order*/,
+        moqt::MoqtForwardingPreference /*forwarding_preference*/,
+        absl::string_view object, bool end_of_message) override {
       if (!end_of_message) {
         std::cerr << "Error: received partial message despite requesting "
                      "buffering\n";
diff --git a/quiche/quic/moqt/tools/moqt_client.cc b/quiche/quic/moqt/tools/moqt_client.cc
index 07c088e..754b68d 100644
--- a/quiche/quic/moqt/tools/moqt_client.cc
+++ b/quiche/quic/moqt/tools/moqt_client.cc
@@ -89,7 +89,7 @@
   }
 
   MoqtSessionParameters parameters;
-  parameters.version = MoqtVersion::kDraft01;
+  parameters.version = MoqtVersion::kDraft02;
   parameters.perspective = quic::Perspective::IS_CLIENT,
   parameters.using_webtrans = true;
   parameters.path = "";
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 9248114..246e841 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -31,7 +31,7 @@
 
 class MockLocalTrackVisitor : public LocalTrack::Visitor {
  public:
-  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeRequestForPast,
+  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeForPast,
               (const SubscribeWindow& window), (override));
 };
 
@@ -42,10 +42,10 @@
                std::optional<absl::string_view> error_reason_phrase),
               (override));
   MOCK_METHOD(void, OnObjectFragment,
-              (const FullTrackName& full_track_name, uint32_t stream_id,
-               uint64_t group_sequence, uint64_t object_sequence,
-               uint64_t object_send_order, absl::string_view object,
-               bool end_of_message),
+              (const FullTrackName& full_track_name, uint64_t group_sequence,
+               uint64_t object_sequence, uint64_t object_send_order,
+               MoqtForwardingPreference forwarding_preference,
+               absl::string_view object, bool end_of_message),
               (override));
 };
 
diff --git a/quiche/quic/moqt/tools/moqt_server.cc b/quiche/quic/moqt/tools/moqt_server.cc
index e811c6d..1c0f51b 100644
--- a/quiche/quic/moqt/tools/moqt_server.cc
+++ b/quiche/quic/moqt/tools/moqt_server.cc
@@ -33,7 +33,7 @@
     parameters.perspective = quic::Perspective::IS_SERVER;
     parameters.path = path;
     parameters.using_webtrans = true;
-    parameters.version = MoqtVersion::kDraft01;
+    parameters.version = MoqtVersion::kDraft02;
     parameters.deliver_partial_objects = false;
     return std::make_unique<MoqtSession>(session, parameters,
                                          *std::move(callbacks));
