MOQT Message formats from draft-02.

Also totally refactored the API for sending Objects. Now that the spec clarifies the stream mapping, applications simply indicate the forwarding preference, and MoqtSession does all the stream management (except that the application indicates when the stream is done.)

Not in production.

PiperOrigin-RevId: 603743999
diff --git a/build/source_list.bzl b/build/source_list.bzl
index 9431ce4..88ba579 100644
--- a/build/source_list.bzl
+++ b/build/source_list.bzl
@@ -1497,6 +1497,7 @@
     "quic/moqt/moqt_parser_test.cc",
     "quic/moqt/moqt_session.cc",
     "quic/moqt/moqt_session_test.cc",
+    "quic/moqt/moqt_subscribe_windows.cc",
     "quic/moqt/moqt_subscribe_windows_test.cc",
     "quic/moqt/moqt_track_test.cc",
     "quic/moqt/tools/chat_client_bin.cc",
diff --git a/build/source_list.gni b/build/source_list.gni
index 9e3a202..68a4748 100644
--- a/build/source_list.gni
+++ b/build/source_list.gni
@@ -1501,6 +1501,7 @@
     "src/quiche/quic/moqt/moqt_parser_test.cc",
     "src/quiche/quic/moqt/moqt_session.cc",
     "src/quiche/quic/moqt/moqt_session_test.cc",
+    "src/quiche/quic/moqt/moqt_subscribe_windows.cc",
     "src/quiche/quic/moqt/moqt_subscribe_windows_test.cc",
     "src/quiche/quic/moqt/moqt_track_test.cc",
     "src/quiche/quic/moqt/tools/chat_client_bin.cc",
diff --git a/build/source_list.json b/build/source_list.json
index 2971cf8..29f189f 100644
--- a/build/source_list.json
+++ b/build/source_list.json
@@ -1500,6 +1500,7 @@
     "quiche/quic/moqt/moqt_parser_test.cc",
     "quiche/quic/moqt/moqt_session.cc",
     "quiche/quic/moqt/moqt_session_test.cc",
+    "quiche/quic/moqt/moqt_subscribe_windows.cc",
     "quiche/quic/moqt/moqt_subscribe_windows_test.cc",
     "quiche/quic/moqt/moqt_track_test.cc",
     "quiche/quic/moqt/tools/chat_client_bin.cc",
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 8666061..2c46476 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -11,8 +11,8 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_data_writer.h"
 #include "quiche/quic/core/quic_time.h"
-#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 
 namespace moqt {
@@ -109,33 +109,67 @@
 }  // namespace
 
 quiche::QuicheBuffer MoqtFramer::SerializeObject(
-    const MoqtObject& message, const absl::string_view payload) {
+    const MoqtObject& message, const absl::string_view payload,
+    bool is_first_in_stream) {
   if (message.payload_length.has_value() &&
       *message.payload_length < payload.length()) {
-    QUICHE_DLOG(INFO) << "payload_size is too small for payload";
+    QUIC_BUG(quic_bug_serialize_object_input_01)
+        << "payload_size is too small for payload";
     return quiche::QuicheBuffer();
   }
-  uint64_t message_type =
-      static_cast<uint64_t>(message.payload_length.has_value()
-                                ? MoqtMessageType::kObjectWithPayloadLength
-                                : MoqtMessageType::kObjectWithoutPayloadLength);
-  size_t buffer_size =
-      NeededVarIntLen(message_type) + NeededVarIntLen(message.track_id) +
-      NeededVarIntLen(message.group_sequence) +
-      NeededVarIntLen(message.object_sequence) +
-      NeededVarIntLen(message.object_send_order) + payload.length();
-  if (message.payload_length.has_value()) {
-    buffer_size += NeededVarIntLen(*message.payload_length);
+  if (!is_first_in_stream &&
+      (message.forwarding_preference == MoqtForwardingPreference::kObject ||
+       message.forwarding_preference == MoqtForwardingPreference::kDatagram)) {
+    QUIC_BUG(quic_bug_serialize_object_input_02)
+        << "Object or Datagram forwarding_preference must be first in stream";
+    return quiche::QuicheBuffer();
   }
+  // Figure out the total message size based on message type and payload.
+  size_t buffer_size = NeededVarIntLen(message.object_id) + payload.length();
+  uint64_t message_type = static_cast<uint64_t>(
+      GetMessageTypeForForwardingPreference(message.forwarding_preference));
+  if (is_first_in_stream) {
+    buffer_size += NeededVarIntLen(message_type) +
+                   NeededVarIntLen(message.subscribe_id) +
+                   NeededVarIntLen(message.track_alias) +
+                   NeededVarIntLen(message.group_id) +
+                   NeededVarIntLen(message.object_send_order);
+  } else if (message.forwarding_preference ==
+             MoqtForwardingPreference::kTrack) {
+    buffer_size += NeededVarIntLen(message.group_id);
+  }
+  uint64_t reported_payload_length = message.payload_length.has_value()
+                                         ? message.payload_length.value()
+                                         : payload.length();
+  if (message.forwarding_preference == MoqtForwardingPreference::kTrack ||
+      message.forwarding_preference == MoqtForwardingPreference::kGroup) {
+    buffer_size += NeededVarIntLen(reported_payload_length);
+  }
+  // Write to buffer.
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
-  writer.WriteVarInt62(message_type);
-  writer.WriteVarInt62(message.track_id);
-  writer.WriteVarInt62(message.group_sequence);
-  writer.WriteVarInt62(message.object_sequence);
-  writer.WriteVarInt62(message.object_send_order);
-  if (message.payload_length.has_value()) {
-    writer.WriteVarInt62(*message.payload_length);
+  if (is_first_in_stream) {
+    writer.WriteVarInt62(message_type);
+    writer.WriteVarInt62(message.subscribe_id);
+    writer.WriteVarInt62(message.track_alias);
+    if (message.forwarding_preference != MoqtForwardingPreference::kTrack) {
+      writer.WriteVarInt62(message.group_id);
+      if (message.forwarding_preference != MoqtForwardingPreference::kGroup) {
+        writer.WriteVarInt62(message.object_id);
+      }
+    }
+    writer.WriteVarInt62(message.object_send_order);
+  }
+  switch (message.forwarding_preference) {
+    case MoqtForwardingPreference::kTrack:
+      writer.WriteVarInt62(message.group_id);
+      [[fallthrough]];
+    case MoqtForwardingPreference::kGroup:
+      writer.WriteVarInt62(message.object_id);
+      writer.WriteVarInt62(reported_payload_length);
+      break;
+    default:
+      break;
   }
   writer.WriteStringPiece(payload);
   return buffer;
@@ -216,8 +250,8 @@
   return buffer;
 }
 
-quiche::QuicheBuffer MoqtFramer::SerializeSubscribeRequest(
-    const MoqtSubscribeRequest& message) {
+quiche::QuicheBuffer MoqtFramer::SerializeSubscribe(
+    const MoqtSubscribe& message) {
   if (!message.start_group.has_value() || !message.start_object.has_value()) {
     QUIC_LOG(INFO) << "start_group or start_object is missing";
     return quiche::QuicheBuffer();
@@ -227,7 +261,9 @@
                    << "non-None";
     return quiche::QuicheBuffer();
   }
-  size_t buffer_size = NeededVarIntLen(MoqtMessageType::kSubscribeRequest) +
+  size_t buffer_size = NeededVarIntLen(MoqtMessageType::kSubscribe) +
+                       NeededVarIntLen(message.subscribe_id) +
+                       NeededVarIntLen(message.track_alias) +
                        LengthPrefixedStringLength(message.track_namespace) +
                        LengthPrefixedStringLength(message.track_name) +
                        LocationLength(message.start_group) +
@@ -244,8 +280,9 @@
   buffer_size += NeededVarIntLen(num_params);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
-  writer.WriteVarInt62(
-      static_cast<uint64_t>(MoqtMessageType::kSubscribeRequest));
+  writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribe));
+  writer.WriteVarInt62(static_cast<uint64_t>(message.subscribe_id));
+  writer.WriteVarInt62(static_cast<uint64_t>(message.track_alias));
   writer.WriteStringPieceVarInt62(message.track_namespace);
   writer.WriteStringPieceVarInt62(message.track_name);
   WriteLocation(writer, message.start_group);
@@ -267,16 +304,12 @@
     const MoqtSubscribeOk& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeOk)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
-      NeededVarIntLen(message.track_id) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.expires.ToMilliseconds());
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeOk));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
-  writer.WriteVarInt62(message.track_id);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.expires.ToMilliseconds());
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
@@ -286,17 +319,17 @@
     const MoqtSubscribeError& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeError)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(static_cast<uint64_t>(message.error_code)) +
-      LengthPrefixedStringLength(message.reason_phrase);
+      LengthPrefixedStringLength(message.reason_phrase) +
+      NeededVarIntLen(message.track_alias);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeError));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(static_cast<uint64_t>(message.error_code));
   writer.WriteStringPieceVarInt62(message.reason_phrase);
+  writer.WriteVarInt62(message.track_alias);
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
 }
@@ -305,13 +338,11 @@
     const MoqtUnsubscribe& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kUnsubscribe)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name);
+      NeededVarIntLen(message.subscribe_id);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kUnsubscribe));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   QUICHE_DCHECK(writer.remaining() == 0);
   return buffer;
 }
@@ -320,15 +351,13 @@
     const MoqtSubscribeFin& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeFin)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.final_group) +
       NeededVarIntLen(message.final_object);
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeFin));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.final_group);
   writer.WriteVarInt62(message.final_object);
   QUICHE_DCHECK(writer.remaining() == 0);
@@ -339,8 +368,7 @@
     const MoqtSubscribeRst& message) {
   size_t buffer_size =
       NeededVarIntLen(static_cast<uint64_t>(MoqtMessageType::kSubscribeRst)) +
-      LengthPrefixedStringLength(message.track_namespace) +
-      LengthPrefixedStringLength(message.track_name) +
+      NeededVarIntLen(message.subscribe_id) +
       NeededVarIntLen(message.error_code) +
       LengthPrefixedStringLength(message.reason_phrase) +
       NeededVarIntLen(message.final_group) +
@@ -348,8 +376,7 @@
   quiche::QuicheBuffer buffer(allocator_, buffer_size);
   quic::QuicDataWriter writer(buffer.size(), buffer.data());
   writer.WriteVarInt62(static_cast<uint64_t>(MoqtMessageType::kSubscribeRst));
-  writer.WriteStringPieceVarInt62(message.track_namespace);
-  writer.WriteStringPieceVarInt62(message.track_name);
+  writer.WriteVarInt62(message.subscribe_id);
   writer.WriteVarInt62(message.error_code);
   writer.WriteStringPieceVarInt62(message.reason_phrase);
   writer.WriteVarInt62(message.final_group);
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index a5f741b..080e76e 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -34,17 +34,21 @@
 
   // SerializeObject also takes a payload. |payload_size| might simply be the
   // size of |payload|, or it could be larger if there is more data coming, or
-  // it could be nullopt if the final length is unknown. If |payload_size| is
-  // smaller than |payload|, returns an empty buffer.
+  // it could be nullopt if the final length is unknown.
+  // If |message.payload_size| is nullopt but the forwarding preference requires
+  // a payload length, will assume that payload.length() is the correct value.
+  // If |message.payload_size| is smaller than |payload|, or
+  // |message.forwarding preference| is not consistent with
+  // |is_first_in_stream|, returns an empty buffer and triggers QUIC_BUG.
   quiche::QuicheBuffer SerializeObject(const MoqtObject& message,
-                                       absl::string_view payload);
+                                       absl::string_view payload,
+                                       bool is_first_in_stream);
   // Build a buffer for additional payload data.
   quiche::QuicheBuffer SerializeObjectPayload(absl::string_view payload);
   quiche::QuicheBuffer SerializeClientSetup(const MoqtClientSetup& message);
   quiche::QuicheBuffer SerializeServerSetup(const MoqtServerSetup& message);
   // Returns an empty buffer if there is an illegal combination of locations.
-  quiche::QuicheBuffer SerializeSubscribeRequest(
-      const MoqtSubscribeRequest& message);
+  quiche::QuicheBuffer SerializeSubscribe(const MoqtSubscribe& message);
   quiche::QuicheBuffer SerializeSubscribeOk(const MoqtSubscribeOk& message);
   quiche::QuicheBuffer SerializeSubscribeError(
       const MoqtSubscribeError& message);
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index f261e1f..ff28d92 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -8,9 +8,9 @@
 #include <string>
 #include <vector>
 
-#include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
+#include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
 #include "quiche/common/quiche_buffer_allocator.h"
 #include "quiche/common/simple_buffer_allocator.h"
@@ -27,11 +27,9 @@
 std::vector<MoqtFramerTestParams> GetMoqtFramerTestParams() {
   std::vector<MoqtFramerTestParams> params;
   std::vector<MoqtMessageType> message_types = {
-      MoqtMessageType::kObjectWithPayloadLength,
-      MoqtMessageType::kObjectWithoutPayloadLength,
-      MoqtMessageType::kClientSetup,
-      MoqtMessageType::kServerSetup,
-      MoqtMessageType::kSubscribeRequest,
+      MoqtMessageType::kObjectStream,
+      MoqtMessageType::kObjectPreferDatagram,
+      MoqtMessageType::kSubscribe,
       MoqtMessageType::kSubscribeOk,
       MoqtMessageType::kSubscribeError,
       MoqtMessageType::kUnsubscribe,
@@ -42,6 +40,10 @@
       MoqtMessageType::kAnnounceError,
       MoqtMessageType::kUnannounce,
       MoqtMessageType::kGoAway,
+      MoqtMessageType::kClientSetup,
+      MoqtMessageType::kServerSetup,
+      MoqtMessageType::kStreamHeaderTrack,
+      MoqtMessageType::kStreamHeaderGroup,
   };
   std::vector<bool> uses_web_transport_bool = {
       false,
@@ -78,61 +80,22 @@
         framer_(buffer_allocator_, GetParam().uses_web_transport) {}
 
   std::unique_ptr<TestMessageBase> MakeMessage(MoqtMessageType message_type) {
-    switch (message_type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        return std::make_unique<ObjectMessageWithLength>();
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        return std::make_unique<ObjectMessageWithoutLength>();
-      case MoqtMessageType::kClientSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kServerSetup:
-        return std::make_unique<ServerSetupMessage>();
-      case MoqtMessageType::kSubscribeRequest:
-        return std::make_unique<SubscribeRequestMessage>();
-      case MoqtMessageType::kSubscribeOk:
-        return std::make_unique<SubscribeOkMessage>();
-      case MoqtMessageType::kSubscribeError:
-        return std::make_unique<SubscribeErrorMessage>();
-      case MoqtMessageType::kUnsubscribe:
-        return std::make_unique<UnsubscribeMessage>();
-      case MoqtMessageType::kSubscribeFin:
-        return std::make_unique<SubscribeFinMessage>();
-      case MoqtMessageType::kSubscribeRst:
-        return std::make_unique<SubscribeRstMessage>();
-      case MoqtMessageType::kAnnounce:
-        return std::make_unique<AnnounceMessage>();
-      case moqt::MoqtMessageType::kAnnounceOk:
-        return std::make_unique<AnnounceOkMessage>();
-      case moqt::MoqtMessageType::kAnnounceError:
-        return std::make_unique<AnnounceErrorMessage>();
-      case moqt::MoqtMessageType::kUnannounce:
-        return std::make_unique<UnannounceMessage>();
-      case moqt::MoqtMessageType::kGoAway:
-        return std::make_unique<GoAwayMessage>();
-      default:
-        return nullptr;
-    }
+    return CreateTestMessage(message_type, webtrans_);
   }
 
   quiche::QuicheBuffer SerializeMessage(
       TestMessageBase::MessageStructuredData& structured_data) {
     switch (message_type_) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-      case MoqtMessageType::kObjectWithoutPayloadLength: {
+      case MoqtMessageType::kObjectStream:
+      case MoqtMessageType::kObjectPreferDatagram:
+      case MoqtMessageType::kStreamHeaderTrack:
+      case MoqtMessageType::kStreamHeaderGroup: {
         auto data = std::get<MoqtObject>(structured_data);
-        return framer_.SerializeObject(data, "foo");
+        return framer_.SerializeObject(data, "foo", true);
       }
-      case MoqtMessageType::kClientSetup: {
-        auto data = std::get<MoqtClientSetup>(structured_data);
-        return framer_.SerializeClientSetup(data);
-      }
-      case MoqtMessageType::kServerSetup: {
-        auto data = std::get<MoqtServerSetup>(structured_data);
-        return framer_.SerializeServerSetup(data);
-      }
-      case MoqtMessageType::kSubscribeRequest: {
-        auto data = std::get<MoqtSubscribeRequest>(structured_data);
-        return framer_.SerializeSubscribeRequest(data);
+      case MoqtMessageType::kSubscribe: {
+        auto data = std::get<MoqtSubscribe>(structured_data);
+        return framer_.SerializeSubscribe(data);
       }
       case MoqtMessageType::kSubscribeOk: {
         auto data = std::get<MoqtSubscribeOk>(structured_data);
@@ -174,6 +137,14 @@
         auto data = std::get<MoqtGoAway>(structured_data);
         return framer_.SerializeGoAway(data);
       }
+      case MoqtMessageType::kClientSetup: {
+        auto data = std::get<MoqtClientSetup>(structured_data);
+        return framer_.SerializeClientSetup(data);
+      }
+      case MoqtMessageType::kServerSetup: {
+        auto data = std::get<MoqtServerSetup>(structured_data);
+        return framer_.SerializeServerSetup(data);
+      }
     }
   }
 
@@ -195,4 +166,68 @@
   EXPECT_EQ(buffer.AsStringView(), message->PacketSample());
 }
 
+class MoqtFramerSimpleTest : public quic::test::QuicTest {
+ public:
+  MoqtFramerSimpleTest()
+      : buffer_allocator_(quiche::SimpleBufferAllocator::Get()),
+        framer_(buffer_allocator_, /*web_transport=*/true) {}
+
+  quiche::SimpleBufferAllocator* buffer_allocator_;
+  MoqtFramer framer_;
+};
+
+TEST_F(MoqtFramerSimpleTest, GroupMiddler) {
+  auto header = std::make_unique<StreamHeaderGroupMessage>();
+  auto buffer1 = framer_.SerializeObject(
+      std::get<MoqtObject>(header->structured_data()), "foo", true);
+  EXPECT_EQ(buffer1.size(), header->total_message_size());
+  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+
+  auto middler = std::make_unique<StreamMiddlerGroupMessage>();
+  auto buffer2 = framer_.SerializeObject(
+      std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  EXPECT_EQ(buffer2.size(), middler->total_message_size());
+  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+}
+
+TEST_F(MoqtFramerSimpleTest, TrackMiddler) {
+  auto header = std::make_unique<StreamHeaderTrackMessage>();
+  auto buffer1 = framer_.SerializeObject(
+      std::get<MoqtObject>(header->structured_data()), "foo", true);
+  EXPECT_EQ(buffer1.size(), header->total_message_size());
+  EXPECT_EQ(buffer1.AsStringView(), header->PacketSample());
+
+  auto middler = std::make_unique<StreamMiddlerTrackMessage>();
+  auto buffer2 = framer_.SerializeObject(
+      std::get<MoqtObject>(middler->structured_data()), "bar", false);
+  EXPECT_EQ(buffer2.size(), middler->total_message_size());
+  EXPECT_EQ(buffer2.AsStringView(), middler->PacketSample());
+}
+
+TEST_F(MoqtFramerSimpleTest, BadObjectInput) {
+  MoqtObject object = {
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id=*/5,
+      /*object_id=*/6,
+      /*object_send_order=*/7,
+      /*forwarding_preference=*/MoqtForwardingPreference::kObject,
+      /*payload_length=*/1,
+  };
+  quiche::QuicheBuffer buffer;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", true),
+                  "payload_size is too small for payload");
+  EXPECT_TRUE(buffer.empty());
+  object.payload_length = 3;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", false),
+                  "Object or Datagram forwarding_preference must be first "
+                  "in stream");
+  EXPECT_TRUE(buffer.empty());
+  object.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeObject(object, "foo", false),
+                  "Object or Datagram forwarding_preference must be first "
+                  "in stream");
+  EXPECT_TRUE(buffer.empty());
+}
+
 }  // namespace moqt::test
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index c1bc980..a690d73 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -117,9 +117,9 @@
  public:
   void CreateDefaultEndpoints() {
     client_ = std::make_unique<ClientEndpoint>(
-        &test_harness_.simulator(), "Client", "Server", MoqtVersion::kDraft01);
+        &test_harness_.simulator(), "Client", "Server", MoqtVersion::kDraft02);
     server_ = std::make_unique<ServerEndpoint>(
-        &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft01);
+        &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft02);
     test_harness_.set_client(client_.get());
     test_harness_.set_server(server_.get());
   }
@@ -170,7 +170,7 @@
       &test_harness_.simulator(), "Client", "Server",
       MoqtVersion::kUnrecognizedVersionForTests);
   server_ = std::make_unique<ServerEndpoint>(
-      &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft01);
+      &test_harness_.simulator(), "Server", "Client", MoqtVersion::kDraft02);
   test_harness_.set_client(client_.get());
   test_harness_.set_server(server_.get());
   WireUpEndpoints();
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 9c9021c..0a27832 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -6,19 +6,21 @@
 
 #include <string>
 
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+
 namespace moqt {
 
 std::string MoqtMessageTypeToString(const MoqtMessageType message_type) {
   switch (message_type) {
-    case MoqtMessageType::kObjectWithPayloadLength:
-      return "OBJECT_WITH_LENGTH";
-    case MoqtMessageType::kObjectWithoutPayloadLength:
-      return "OBJECT_WITHOUT_LENGTH";
+    case MoqtMessageType::kObjectStream:
+      return "OBJECT_STREAM";
+    case MoqtMessageType::kObjectPreferDatagram:
+      return "OBJECT_PREFER_DATAGRAM";
     case MoqtMessageType::kClientSetup:
       return "CLIENT_SETUP";
     case MoqtMessageType::kServerSetup:
       return "SERVER_SETUP";
-    case MoqtMessageType::kSubscribeRequest:
+    case MoqtMessageType::kSubscribe:
       return "SUBSCRIBE_REQUEST";
     case MoqtMessageType::kSubscribeOk:
       return "SUBSCRIBE_OK";
@@ -40,7 +42,64 @@
       return "UNANNOUNCE";
     case MoqtMessageType::kGoAway:
       return "GOAWAY";
+    case MoqtMessageType::kStreamHeaderTrack:
+      return "STREAM_HEADER_TRACK";
+    case MoqtMessageType::kStreamHeaderGroup:
+      return "STREAM_HEADER_GROUP";
   }
+  return "Unknown message " + std::to_string(static_cast<int>(message_type));
+}
+
+std::string MoqtForwardingPreferenceToString(
+    MoqtForwardingPreference preference) {
+  switch (preference) {
+    case MoqtForwardingPreference::kObject:
+      return "OBJECT";
+    case MoqtForwardingPreference::kDatagram:
+      return "DATAGRAM";
+    case MoqtForwardingPreference::kTrack:
+      return "TRACK";
+    case MoqtForwardingPreference::kGroup:
+      return "GROUP";
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_01)
+      << "Unknown preference " << std::to_string(static_cast<int>(preference));
+  return "Unknown preference " + std::to_string(static_cast<int>(preference));
+}
+
+MoqtForwardingPreference GetForwardingPreference(MoqtMessageType type) {
+  switch (type) {
+    case MoqtMessageType::kObjectStream:
+      return MoqtForwardingPreference::kObject;
+    case MoqtMessageType::kObjectPreferDatagram:
+      return MoqtForwardingPreference::kDatagram;
+    case MoqtMessageType::kStreamHeaderTrack:
+      return MoqtForwardingPreference::kTrack;
+    case MoqtMessageType::kStreamHeaderGroup:
+      return MoqtForwardingPreference::kGroup;
+    default:
+      break;
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_02)
+      << "Message type does not indicate forwarding preference";
+  return MoqtForwardingPreference::kObject;
+};
+
+MoqtMessageType GetMessageTypeForForwardingPreference(
+    MoqtForwardingPreference preference) {
+  switch (preference) {
+    case MoqtForwardingPreference::kObject:
+      return MoqtMessageType::kObjectStream;
+    case MoqtForwardingPreference::kDatagram:
+      return MoqtMessageType::kObjectPreferDatagram;
+    case MoqtForwardingPreference::kTrack:
+      return MoqtMessageType::kStreamHeaderTrack;
+    case MoqtForwardingPreference::kGroup:
+      return MoqtMessageType::kStreamHeaderGroup;
+  }
+  QUIC_BUG(quic_bug_bad_moqt_message_type_03)
+      << "Forwarding preference does not indicate message type";
+  return MoqtMessageType::kObjectStream;
 }
 
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 8fd0c55..a7c9e5d 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-// Structured data for message types in draft-ietf-moq-transport-01.
+// Structured data for message types in draft-ietf-moq-transport-02.
 
 #ifndef QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
 #define QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
@@ -20,6 +20,12 @@
 #include "quiche/quic/core/quic_versions.h"
 #include "quiche/common/platform/api/quiche_export.h"
 
+// the draft-02 spec makes AUTH_INFO unparseable in SUBSCRIBE messages. This
+// flag assumes that the num_parameters field exists so that it is parseable.
+// If false, there is no num_parameters field and there must not be an
+// AUTH_INFO field.
+#define MOQT_AUTH_INFO
+
 namespace moqt {
 
 inline constexpr quic::ParsedQuicVersionVector GetMoqtSupportedQuicVersions() {
@@ -27,7 +33,7 @@
 }
 
 enum class MoqtVersion : uint64_t {
-  kDraft01 = 0xff000001,
+  kDraft02 = 0xff000002,
   kUnrecognizedVersionForTests = 0xfe0000ff,
 };
 
@@ -46,9 +52,9 @@
 inline constexpr size_t kMaxMessageHeaderSize = 2048;
 
 enum class QUICHE_EXPORT MoqtMessageType : uint64_t {
-  kObjectWithPayloadLength = 0x00,
-  kObjectWithoutPayloadLength = 0x02,
-  kSubscribeRequest = 0x03,
+  kObjectStream = 0x00,
+  kObjectPreferDatagram = 0x01,
+  kSubscribe = 0x03,
   kSubscribeOk = 0x04,
   kSubscribeError = 0x05,
   kAnnounce = 0x06,
@@ -61,6 +67,8 @@
   kGoAway = 0x10,
   kClientSetup = 0x40,
   kServerSetup = 0x41,
+  kStreamHeaderTrack = 0x50,
+  kStreamHeaderGroup = 0x51,
 };
 
 enum class QUICHE_EXPORT MoqtError : uint64_t {
@@ -85,17 +93,14 @@
 };
 
 enum class QUICHE_EXPORT MoqtTrackRequestParameter : uint64_t {
-  // These two should have been deleted in draft-01.
-  // kGroupSequence = 0x0,
-  // kObjectSequence = 0x1,
   kAuthorizationInfo = 0x2,
 };
 
 struct FullTrackName {
   std::string track_namespace;
   std::string track_name;
-  FullTrackName(std::string ns, std::string name)
-      : track_namespace(std::move(ns)), track_name(std::move(name)) {}
+  FullTrackName(absl::string_view ns, absl::string_view name)
+      : track_namespace(ns), track_name(name) {}
   bool operator==(const FullTrackName& other) const {
     return track_namespace == other.track_namespace &&
            track_name == other.track_name;
@@ -125,6 +130,17 @@
     return group < other.group ||
            (group == other.group && object < other.object);
   }
+  bool operator<=(const FullSequence& other) const {
+    return (group < other.group ||
+            (group == other.group && object <= other.object));
+  }
+  FullSequence& operator=(FullSequence other) {
+    group = other.group;
+    object = other.object;
+    return *this;
+  }
+  template <typename H>
+  friend H AbslHashValue(H h, const FullSequence& m);
 };
 
 template <typename H>
@@ -143,13 +159,25 @@
   std::optional<MoqtRole> role;
 };
 
+// These codes do not appear on the wire.
+enum class QUICHE_EXPORT MoqtForwardingPreference : uint8_t {
+  kTrack = 0x0,
+  kGroup = 0x1,
+  kObject = 0x2,
+  kDatagram = 0x3,
+};
+
+// The data contained in every Object message, although the message type
+// implies some of the values. |payload_length| has no value if the length
+// is unknown (because it runs to the end of the stream.)
 struct QUICHE_EXPORT MoqtObject {
-  uint64_t track_id;
-  uint64_t group_sequence;
-  uint64_t object_sequence;
+  uint64_t subscribe_id;
+  uint64_t track_alias;
+  uint64_t group_id;
+  uint64_t object_id;
   uint64_t object_send_order;
+  MoqtForwardingPreference forwarding_preference;
   std::optional<uint64_t> payload_length;
-  // Message also includes the object payload.
 };
 
 enum class QUICHE_EXPORT MoqtSubscribeLocationMode : uint64_t {
@@ -180,7 +208,9 @@
   }
 };
 
-struct QUICHE_EXPORT MoqtSubscribeRequest {
+struct QUICHE_EXPORT MoqtSubscribe {
+  uint64_t subscribe_id;
+  uint64_t track_alias;
   absl::string_view track_namespace;
   absl::string_view track_name;
   // If the mode is kNone, the these are std::nullopt.
@@ -188,13 +218,13 @@
   std::optional<MoqtSubscribeLocation> start_object;
   std::optional<MoqtSubscribeLocation> end_group;
   std::optional<MoqtSubscribeLocation> end_object;
+#ifdef MOQT_AUTH_INFO
   std::optional<absl::string_view> authorization_info;
+#endif
 };
 
 struct QUICHE_EXPORT MoqtSubscribeOk {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
-  uint64_t track_id;
+  uint64_t subscribe_id;
   // The message uses ms, but expires is in us.
   quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
 };
@@ -206,27 +236,24 @@
 };
 
 struct QUICHE_EXPORT MoqtSubscribeError {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   SubscribeErrorCode error_code;
   absl::string_view reason_phrase;
+  uint64_t track_alias;
 };
 
 struct QUICHE_EXPORT MoqtUnsubscribe {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
 };
 
 struct QUICHE_EXPORT MoqtSubscribeFin {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   uint64_t final_group;
   uint64_t final_object;
 };
 
 struct QUICHE_EXPORT MoqtSubscribeRst {
-  absl::string_view track_namespace;
-  absl::string_view track_name;
+  uint64_t subscribe_id;
   uint64_t error_code;
   absl::string_view reason_phrase;
   uint64_t final_group;
@@ -258,6 +285,14 @@
 
 std::string MoqtMessageTypeToString(MoqtMessageType message_type);
 
+std::string MoqtForwardingPreferenceToString(
+    MoqtForwardingPreference preference);
+
+MoqtForwardingPreference GetForwardingPreference(MoqtMessageType type);
+
+MoqtMessageType GetMessageTypeForForwardingPreference(
+    MoqtForwardingPreference preference);
+
 }  // namespace moqt
 
 #endif  // QUICHE_QUIC_MOQT_MOQT_MESSAGES_H_
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 2b9e15f..b8754f2 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -7,7 +7,6 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
-#include <memory>
 #include <optional>
 #include <string>
 
@@ -41,9 +40,8 @@
   // Check for early fin
   if (fin) {
     no_more_data_ = true;
-    if (object_metadata_.has_value() &&
-        object_metadata_->payload_length.has_value() &&
-        *object_metadata_->payload_length > data.length()) {
+    if (ObjectPayloadInProgress() &&
+        payload_length_remaining_ > data.length()) {
       ParseError("End of stream before complete OBJECT PAYLOAD");
       return;
     }
@@ -57,7 +55,7 @@
   // There are three cases: the parser has already delivered an OBJECT header
   // and is now delivering payload; part of a message is in the buffer; or
   // no message is in progress.
-  if (object_metadata_.has_value()) {
+  if (ObjectPayloadInProgress()) {
     // This is additional payload for an OBJECT.
     QUICHE_DCHECK(buffered_message_.empty());
     if (!object_metadata_->payload_length.has_value()) {
@@ -79,7 +77,7 @@
     visitor_.OnObjectMessage(*object_metadata_,
                              data.substr(0, payload_length_remaining_), true);
     reader->Seek(payload_length_remaining_);
-    object_metadata_.reset();
+    payload_length_remaining_ = 0;  // Expect a new object.
   } else if (!buffered_message_.empty()) {
     absl::StrAppend(&buffered_message_, data);
     reader.emplace(buffered_message_);
@@ -113,29 +111,34 @@
   if (original_buffer_size > 0) {
     buffered_message_.erase(0, total_processed);
   }
-  if (fin && object_metadata_.has_value()) {
-    ParseError("Received FIN mid-payload");
-  }
 }
 
 size_t MoqtParser::ProcessMessage(absl::string_view data, bool fin) {
   uint64_t value;
   quic::QuicDataReader reader(data);
+  if (ObjectStreamInitialized() && !ObjectPayloadInProgress()) {
+    // This is a follow-on object in a stream.
+    return ProcessObject(reader,
+                         GetMessageTypeForForwardingPreference(
+                             object_metadata_->forwarding_preference),
+                         fin);
+  }
   if (!reader.ReadVarInt62(&value)) {
     return 0;
   }
   auto type = static_cast<MoqtMessageType>(value);
   switch (type) {
-    case MoqtMessageType::kObjectWithPayloadLength:
-      return ProcessObject(reader, true, fin);
-    case MoqtMessageType::kObjectWithoutPayloadLength:
-      return ProcessObject(reader, false, fin);
+    case MoqtMessageType::kObjectStream:
+    case MoqtMessageType::kObjectPreferDatagram:
+    case MoqtMessageType::kStreamHeaderTrack:
+    case MoqtMessageType::kStreamHeaderGroup:
+      return ProcessObject(reader, type, fin);
     case MoqtMessageType::kClientSetup:
       return ProcessClientSetup(reader);
     case MoqtMessageType::kServerSetup:
       return ProcessServerSetup(reader);
-    case MoqtMessageType::kSubscribeRequest:
-      return ProcessSubscribeRequest(reader);
+    case MoqtMessageType::kSubscribe:
+      return ProcessSubscribe(reader);
     case MoqtMessageType::kSubscribeOk:
       return ProcessSubscribeOk(reader);
     case MoqtMessageType::kSubscribeError:
@@ -162,39 +165,84 @@
   }
 }
 
-size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader, bool has_length,
-                                 bool fin) {
-  QUICHE_DCHECK(!object_metadata_.has_value());
-  object_metadata_ = MoqtObject();
-  uint64_t length;
-  if (!reader.ReadVarInt62(&object_metadata_->track_id) ||
-      !reader.ReadVarInt62(&object_metadata_->group_sequence) ||
-      !reader.ReadVarInt62(&object_metadata_->object_sequence) ||
-      !reader.ReadVarInt62(&object_metadata_->object_send_order) ||
-      (has_length && !reader.ReadVarInt62(&length))) {
-    object_metadata_.reset();
-    return 0;
+size_t MoqtParser::ProcessObject(quic::QuicDataReader& reader,
+                                 MoqtMessageType type, bool fin) {
+  size_t processed_data;
+  QUICHE_DCHECK(!ObjectPayloadInProgress());
+  if (!ObjectStreamInitialized()) {
+    // nothing has been processed on the stream.
+    object_metadata_ = MoqtObject();
+    if (!reader.ReadVarInt62(&object_metadata_->subscribe_id) ||
+        !reader.ReadVarInt62(&object_metadata_->track_alias)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (type != MoqtMessageType::kStreamHeaderTrack &&
+        !reader.ReadVarInt62(&object_metadata_->group_id)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (type != MoqtMessageType::kStreamHeaderTrack &&
+        type != MoqtMessageType::kStreamHeaderGroup &&
+        !reader.ReadVarInt62(&object_metadata_->object_id)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    if (!reader.ReadVarInt62(&object_metadata_->object_send_order)) {
+      object_metadata_.reset();
+      return 0;
+    }
+    object_metadata_->forwarding_preference = GetForwardingPreference(type);
   }
-  if (has_length) {
-    object_metadata_->payload_length = length;
+  // At this point, enough data has been processed to store in object_metadata_,
+  // even if there's nothing else in the buffer.
+  processed_data = reader.PreviouslyReadPayload().length();
+  QUICHE_DCHECK(payload_length_remaining_ == 0);
+  switch (type) {
+    case MoqtMessageType::kStreamHeaderTrack:
+      if (!reader.ReadVarInt62(&object_metadata_->group_id)) {
+        return processed_data;
+      }
+      [[fallthrough]];
+    case MoqtMessageType::kStreamHeaderGroup: {
+      uint64_t length;
+      if (!reader.ReadVarInt62(&object_metadata_->object_id) ||
+          !reader.ReadVarInt62(&length)) {
+        return processed_data;
+      }
+      object_metadata_->payload_length = length;
+      break;
+    }
+    default:
+      break;
   }
-  bool received_complete_message =
-      (fin && !has_length) ||
-      (has_length &&
-       *object_metadata_->payload_length <= reader.BytesRemaining());
-  size_t payload_to_draw = (!has_length || *object_metadata_->payload_length >=
-                                               reader.BytesRemaining())
-                               ? reader.BytesRemaining()
-                               : *object_metadata_->payload_length;
+  bool has_length = object_metadata_->payload_length.has_value();
+  bool received_complete_message = false;
+  size_t payload_to_draw = reader.BytesRemaining();
+  if (fin && has_length &&
+      *object_metadata_->payload_length > reader.BytesRemaining()) {
+    ParseError("Received FIN mid-payload");
+    return processed_data;
+  }
+  received_complete_message =
+      fin || (has_length &&
+              *object_metadata_->payload_length <= reader.BytesRemaining());
+  if (received_complete_message && has_length &&
+      *object_metadata_->payload_length < reader.BytesRemaining()) {
+    payload_to_draw = *object_metadata_->payload_length;
+  }
+  // The error case where there's a fin before the explicit length is complete
+  // is handled in ProcessData() in two separate places. Even though the
+  // message is "done" if fin regardless of has_length, it's bad to report to
+  // the application that the object is done if it hasn't reached the promised
+  // length.
   visitor_.OnObjectMessage(
       *object_metadata_,
       reader.PeekRemainingPayload().substr(0, payload_to_draw),
       received_complete_message);
-  if (received_complete_message) {
-    object_metadata_.reset();
-  }
   reader.Seek(payload_to_draw);
-  payload_length_remaining_ = length - payload_to_draw;
+  payload_length_remaining_ =
+      has_length ? *object_metadata_->payload_length - payload_to_draw : 0;
   return reader.PreviouslyReadPayload().length();
 }
 
@@ -303,41 +351,38 @@
   return reader.PreviouslyReadPayload().length();
 }
 
-size_t MoqtParser::ProcessSubscribeRequest(quic::QuicDataReader& reader) {
-  MoqtSubscribeRequest subscribe_request;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_request.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_request.track_name)) {
-    return 0;
-  }
-  if (!ReadLocation(reader, subscribe_request.start_group)) {
+size_t MoqtParser::ProcessSubscribe(quic::QuicDataReader& reader) {
+  MoqtSubscribe subscribe_request;
+  if (!reader.ReadVarInt62(&subscribe_request.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_request.track_alias) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_request.track_namespace) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_request.track_name) ||
+      !ReadLocation(reader, subscribe_request.start_group)) {
     return 0;
   }
   if (!subscribe_request.start_group.has_value()) {
-    ParseError("START_GROUP must not be None in SUBSCRIBE_REQUEST");
+    ParseError("START_GROUP must not be None in SUBSCRIBE");
     return 0;
   }
   if (!ReadLocation(reader, subscribe_request.start_object)) {
     return 0;
   }
   if (!subscribe_request.start_object.has_value()) {
-    ParseError("START_OBJECT must not be None in SUBSCRIBE_REQUEST");
+    ParseError("START_OBJECT must not be None in SUBSCRIBE");
     return 0;
   }
-  if (!ReadLocation(reader, subscribe_request.end_group)) {
-    return 0;
-  }
-  if (!ReadLocation(reader, subscribe_request.end_object)) {
+  if (!ReadLocation(reader, subscribe_request.end_group) ||
+      !ReadLocation(reader, subscribe_request.end_object)) {
     return 0;
   }
   if (subscribe_request.end_group.has_value() !=
       subscribe_request.end_object.has_value()) {
     ParseError(
-        "SUBSCRIBE_REQUEST end_group and end_object must be both None "
+        "SUBSCRIBE end_group and end_object must be both None "
         "or both non_None");
     return 0;
   }
+#ifdef MOQT_AUTH_INFO
   uint64_t num_params;
   if (!reader.ReadVarInt62(&num_params)) {
     return 0;
@@ -364,23 +409,16 @@
         break;
     }
   }
-  visitor_.OnSubscribeRequestMessage(subscribe_request);
+#endif
+  visitor_.OnSubscribeMessage(subscribe_request);
   return reader.PreviouslyReadPayload().length();
 }
 
 size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
   MoqtSubscribeOk subscribe_ok;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_ok.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_ok.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_ok.track_id)) {
-    return 0;
-  }
   uint64_t milliseconds;
-  if (!reader.ReadVarInt62(&milliseconds)) {
+  if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
+      !reader.ReadVarInt62(&milliseconds)) {
     return 0;
   }
   subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
@@ -390,30 +428,21 @@
 
 size_t MoqtParser::ProcessSubscribeError(quic::QuicDataReader& reader) {
   MoqtSubscribeError subscribe_error;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.track_name)) {
-    return 0;
-  }
   uint64_t error_code;
-  if (!reader.ReadVarInt62(&error_code)) {
+  if (!reader.ReadVarInt62(&subscribe_error.subscribe_id) ||
+      !reader.ReadVarInt62(&error_code) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase) ||
+      !reader.ReadVarInt62(&subscribe_error.track_alias)) {
     return 0;
   }
   subscribe_error.error_code = static_cast<SubscribeErrorCode>(error_code);
-  if (!reader.ReadStringPieceVarInt62(&subscribe_error.reason_phrase)) {
-    return 0;
-  }
   visitor_.OnSubscribeErrorMessage(subscribe_error);
   return reader.PreviouslyReadPayload().length();
 }
 
 size_t MoqtParser::ProcessUnsubscribe(quic::QuicDataReader& reader) {
   MoqtUnsubscribe unsubscribe;
-  if (!reader.ReadStringPieceVarInt62(&unsubscribe.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&unsubscribe.track_name)) {
+  if (!reader.ReadVarInt62(&unsubscribe.subscribe_id)) {
     return 0;
   }
   visitor_.OnUnsubscribeMessage(unsubscribe);
@@ -422,16 +451,9 @@
 
 size_t MoqtParser::ProcessSubscribeFin(quic::QuicDataReader& reader) {
   MoqtSubscribeFin subscribe_fin;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_fin.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_fin.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_fin.final_group)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_fin.final_object)) {
+  if (!reader.ReadVarInt62(&subscribe_fin.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_fin.final_group) ||
+      !reader.ReadVarInt62(&subscribe_fin.final_object)) {
     return 0;
   }
   visitor_.OnSubscribeFinMessage(subscribe_fin);
@@ -440,22 +462,11 @@
 
 size_t MoqtParser::ProcessSubscribeRst(quic::QuicDataReader& reader) {
   MoqtSubscribeRst subscribe_rst;
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.track_namespace)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.track_name)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.error_code)) {
-    return 0;
-  }
-  if (!reader.ReadStringPieceVarInt62(&subscribe_rst.reason_phrase)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.final_group)) {
-    return 0;
-  }
-  if (!reader.ReadVarInt62(&subscribe_rst.final_object)) {
+  if (!reader.ReadVarInt62(&subscribe_rst.subscribe_id) ||
+      !reader.ReadVarInt62(&subscribe_rst.error_code) ||
+      !reader.ReadStringPieceVarInt62(&subscribe_rst.reason_phrase) ||
+      !reader.ReadVarInt62(&subscribe_rst.final_group) ||
+      !reader.ReadVarInt62(&subscribe_rst.final_object)) {
     return 0;
   }
   visitor_.OnSubscribeRstMessage(subscribe_rst);
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 2674885..3bb7f47 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -34,8 +34,7 @@
   // parser retains ownership of the memory.
   virtual void OnClientSetupMessage(const MoqtClientSetup& message) = 0;
   virtual void OnServerSetupMessage(const MoqtServerSetup& message) = 0;
-  virtual void OnSubscribeRequestMessage(
-      const MoqtSubscribeRequest& message) = 0;
+  virtual void OnSubscribeMessage(const MoqtSubscribe& message) = 0;
   virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0;
   virtual void OnSubscribeErrorMessage(const MoqtSubscribeError& message) = 0;
   virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0;
@@ -63,6 +62,8 @@
   // All bytes can be freed. Calls OnParsingError() when there is a parsing
   // error.
   // Any calls after sending |fin| = true will be ignored.
+  // TODO(martinduke): Figure out what has to happen if the message arrives via
+  // datagram rather than a stream.
   void ProcessData(absl::string_view data, bool fin);
 
  private:
@@ -75,10 +76,11 @@
   // structs, and call the relevant visitor function for further action. Returns
   // the number of bytes consumed if the message is complete; returns 0
   // otherwise.
-  size_t ProcessObject(quic::QuicDataReader& reader, bool has_length, bool fin);
+  size_t ProcessObject(quic::QuicDataReader& reader, MoqtMessageType type,
+                       bool fin);
   size_t ProcessClientSetup(quic::QuicDataReader& reader);
   size_t ProcessServerSetup(quic::QuicDataReader& reader);
-  size_t ProcessSubscribeRequest(quic::QuicDataReader& reader);
+  size_t ProcessSubscribe(quic::QuicDataReader& reader);
   size_t ProcessSubscribeOk(quic::QuicDataReader& reader);
   size_t ProcessSubscribeError(quic::QuicDataReader& reader);
   size_t ProcessUnsubscribe(quic::QuicDataReader& reader);
@@ -108,6 +110,21 @@
   // string_view is not exactly the right length.
   bool StringViewToVarInt(absl::string_view& sv, uint64_t& vi);
 
+  // Simplify understanding of state.
+  // Returns true if the stream has delivered all object metadata common to all
+  // objects on that stream.
+  bool ObjectStreamInitialized() const { return object_metadata_.has_value(); }
+  // Returns true if the stream has delivered all metadata but not all payload
+  // for the most recent object.
+  bool ObjectPayloadInProgress() const {
+    return (object_metadata_.has_value() &&
+            (object_metadata_->forwarding_preference ==
+                 MoqtForwardingPreference::kObject ||
+             object_metadata_->forwarding_preference ==
+                 MoqtForwardingPreference::kDatagram ||
+             payload_length_remaining_ > 0));
+  }
+
   MoqtParserVisitor& visitor_;
   bool uses_web_transport_;
   bool no_more_data_ = false;  // Fatal error or fin. No more parsing.
@@ -116,8 +133,17 @@
   std::string buffered_message_;
 
   // Metadata for an object which is delivered in parts.
+  // If object_metadata_ is nullopt, nothing has been processed on the stream.
+  // If object_metadata_ exists but payload_length is nullopt or
+  // payload_length_remaining_ is nonzero, the object payload is in mid-
+  // delivery.
+  // If object_metadata_ exists and payload_length_remaining_ is zero, an object
+  // has been completely delivered and the next object header on the stream has
+  // not been delivered.
+  // Use ObjectStreamInitialized() and ObjectPayloadInProgress() to keep the
+  // state straight.
   std::optional<MoqtObject> object_metadata_ = std::nullopt;
-  size_t payload_length_remaining_;
+  size_t payload_length_remaining_ = 0;
 
   bool processing_ = false;  // True if currently in ProcessData(), to prevent
                              // re-entrancy.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index d5d8f45..c0f1e9b 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -22,17 +22,22 @@
 
 namespace {
 
-bool IsObjectMessage(MoqtMessageType type) {
-  return (type == MoqtMessageType::kObjectWithPayloadLength ||
-          type == MoqtMessageType::kObjectWithoutPayloadLength);
+inline bool IsObjectMessage(MoqtMessageType type) {
+  return (type == MoqtMessageType::kObjectStream ||
+          type == MoqtMessageType::kObjectPreferDatagram ||
+          type == MoqtMessageType::kStreamHeaderTrack ||
+          type == MoqtMessageType::kStreamHeaderGroup);
+}
+
+inline bool IsObjectWithoutPayloadLength(MoqtMessageType type) {
+  return (type == MoqtMessageType::kObjectStream ||
+          type == MoqtMessageType::kObjectPreferDatagram);
 }
 
 std::vector<MoqtMessageType> message_types = {
-    MoqtMessageType::kObjectWithPayloadLength,
-    MoqtMessageType::kObjectWithoutPayloadLength,
-    MoqtMessageType::kClientSetup,
-    MoqtMessageType::kServerSetup,
-    MoqtMessageType::kSubscribeRequest,
+    MoqtMessageType::kObjectStream,
+    MoqtMessageType::kObjectPreferDatagram,
+    MoqtMessageType::kSubscribe,
     MoqtMessageType::kSubscribeOk,
     MoqtMessageType::kSubscribeError,
     MoqtMessageType::kUnsubscribe,
@@ -42,6 +47,10 @@
     MoqtMessageType::kAnnounceOk,
     MoqtMessageType::kAnnounceError,
     MoqtMessageType::kUnannounce,
+    MoqtMessageType::kClientSetup,
+    MoqtMessageType::kServerSetup,
+    MoqtMessageType::kStreamHeaderTrack,
+    MoqtMessageType::kStreamHeaderGroup,
     MoqtMessageType::kGoAway,
 };
 
@@ -110,72 +119,54 @@
     MoqtServerSetup server_setup = message;
     last_message_ = TestMessageBase::MessageStructuredData(server_setup);
   }
-  void OnSubscribeRequestMessage(const MoqtSubscribeRequest& message) override {
+  void OnSubscribeMessage(const MoqtSubscribe& message) override {
     end_of_message_ = true;
     messages_received_++;
-    MoqtSubscribeRequest subscribe_request = message;
+    MoqtSubscribe subscribe_request = message;
     string0_ = std::string(subscribe_request.track_namespace);
     subscribe_request.track_namespace = absl::string_view(string0_);
     string1_ = std::string(subscribe_request.track_name);
     subscribe_request.track_name = absl::string_view(string1_);
+#ifdef MOQT_AUTH_INFO
     if (subscribe_request.authorization_info.has_value()) {
       string2_ = std::string(*subscribe_request.authorization_info);
       subscribe_request.authorization_info = absl::string_view(string2_);
     }
+#endif
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_request);
   }
   void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeOk subscribe_ok = message;
-    string0_ = std::string(subscribe_ok.track_namespace);
-    subscribe_ok.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_ok.track_name);
-    subscribe_ok.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_ok);
   }
   void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeError subscribe_error = message;
-    string0_ = std::string(subscribe_error.track_namespace);
-    subscribe_error.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_error.track_name);
-    subscribe_error.track_name = absl::string_view(string1_);
-    string1_ = std::string(subscribe_error.reason_phrase);
-    subscribe_error.reason_phrase = absl::string_view(string1_);
+    string0_ = std::string(subscribe_error.reason_phrase);
+    subscribe_error.reason_phrase = absl::string_view(string0_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_error);
   }
   void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtUnsubscribe unsubscribe = message;
-    string0_ = std::string(unsubscribe.track_namespace);
-    unsubscribe.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(unsubscribe.track_name);
-    unsubscribe.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(unsubscribe);
   }
   void OnSubscribeFinMessage(const MoqtSubscribeFin& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeFin subscribe_fin = message;
-    string0_ = std::string(subscribe_fin.track_namespace);
-    subscribe_fin.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_fin.track_name);
-    subscribe_fin.track_name = absl::string_view(string1_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_fin);
   }
   void OnSubscribeRstMessage(const MoqtSubscribeRst& message) override {
     end_of_message_ = true;
     messages_received_++;
     MoqtSubscribeRst subscribe_rst = message;
-    string0_ = std::string(subscribe_rst.track_namespace);
-    subscribe_rst.track_namespace = absl::string_view(string0_);
-    string1_ = std::string(subscribe_rst.track_name);
-    subscribe_rst.track_name = absl::string_view(string1_);
-    string2_ = std::string(subscribe_rst.reason_phrase);
-    subscribe_rst.reason_phrase = absl::string_view(string2_);
+    string0_ = std::string(subscribe_rst.reason_phrase);
+    subscribe_rst.reason_phrase = absl::string_view(string0_);
     last_message_ = TestMessageBase::MessageStructuredData(subscribe_rst);
   }
   void OnAnnounceMessage(const MoqtAnnounce& message) override {
@@ -252,40 +243,7 @@
         parser_(GetParam().uses_web_transport, visitor_) {}
 
   std::unique_ptr<TestMessageBase> MakeMessage(MoqtMessageType message_type) {
-    switch (message_type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        return std::make_unique<ObjectMessageWithLength>();
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        return std::make_unique<ObjectMessageWithoutLength>();
-      case MoqtMessageType::kClientSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kServerSetup:
-        return std::make_unique<ClientSetupMessage>(webtrans_);
-      case MoqtMessageType::kSubscribeRequest:
-        return std::make_unique<SubscribeRequestMessage>();
-      case MoqtMessageType::kSubscribeOk:
-        return std::make_unique<SubscribeOkMessage>();
-      case MoqtMessageType::kSubscribeError:
-        return std::make_unique<SubscribeErrorMessage>();
-      case MoqtMessageType::kUnsubscribe:
-        return std::make_unique<UnsubscribeMessage>();
-      case MoqtMessageType::kSubscribeFin:
-        return std::make_unique<SubscribeFinMessage>();
-      case MoqtMessageType::kSubscribeRst:
-        return std::make_unique<SubscribeRstMessage>();
-      case MoqtMessageType::kAnnounce:
-        return std::make_unique<AnnounceMessage>();
-      case moqt::MoqtMessageType::kAnnounceOk:
-        return std::make_unique<AnnounceOkMessage>();
-      case moqt::MoqtMessageType::kAnnounceError:
-        return std::make_unique<AnnounceErrorMessage>();
-      case moqt::MoqtMessageType::kUnannounce:
-        return std::make_unique<UnannounceMessage>();
-      case moqt::MoqtMessageType::kGoAway:
-        return std::make_unique<GoAwayMessage>();
-      default:
-        return nullptr;
-    }
+    return CreateTestMessage(message_type, webtrans_);
   }
 
   MoqtParserTestVisitor visitor_;
@@ -331,6 +289,10 @@
   // so splitting the message in half will prevent the first half from being
   // processed.
   size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
   parser_.ProcessData(message->PacketSample().substr(0, first_data_size),
                       false);
   EXPECT_EQ(visitor_.messages_received_, 0);
@@ -359,7 +321,7 @@
   }
   EXPECT_EQ(visitor_.messages_received_,
             (IsObjectMessage(message_type_) ? (kObjectPayloadSize + 1) : 1));
-  if (message_type_ == MoqtMessageType::kObjectWithoutPayloadLength) {
+  if (IsObjectWithoutPayloadLength(message_type_)) {
     EXPECT_FALSE(visitor_.end_of_message_);
     parser_.ProcessData(absl::string_view(), true);  // Needs the FIN
     EXPECT_EQ(visitor_.messages_received_, kObjectPayloadSize + 2);
@@ -382,7 +344,7 @@
   }
   EXPECT_EQ(visitor_.messages_received_,
             (IsObjectMessage(message_type_) ? (kObjectPayloadSize + 1) : 1));
-  if (message_type_ == MoqtMessageType::kObjectWithoutPayloadLength) {
+  if (IsObjectWithoutPayloadLength(message_type_)) {
     EXPECT_FALSE(visitor_.end_of_message_);
     parser_.ProcessData(absl::string_view(), true);  // Needs the FIN
     EXPECT_EQ(visitor_.messages_received_, kObjectPayloadSize + 2);
@@ -394,9 +356,12 @@
 
 TEST_P(MoqtParserTest, EarlyFin) {
   std::unique_ptr<TestMessageBase> message = MakeMessage(message_type_);
-  parser_.ProcessData(
-      message->PacketSample().substr(0, message->total_message_size() / 2),
-      true);
+  size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
+  parser_.ProcessData(message->PacketSample().substr(0, first_data_size), true);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_, "FIN after incomplete message");
@@ -404,9 +369,13 @@
 
 TEST_P(MoqtParserTest, SeparateEarlyFin) {
   std::unique_ptr<TestMessageBase> message = MakeMessage(message_type_);
-  parser_.ProcessData(
-      message->PacketSample().substr(0, message->total_message_size() / 2),
-      false);
+  size_t first_data_size = message->total_message_size() / 2;
+  if (message_type_ == MoqtMessageType::kStreamHeaderTrack) {
+    // The boundary happens to fall right after the stream header, so move it.
+    ++first_data_size;
+  }
+  parser_.ProcessData(message->PacketSample().substr(0, first_data_size),
+                      false);
   parser_.ProcessData(absl::string_view(), true);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
@@ -426,11 +395,32 @@
   static constexpr bool kRawQuic = false;
 };
 
-TEST_F(MoqtMessageSpecificTest, ObjectNoLengthSeparateFin) {
+TEST_F(MoqtMessageSpecificTest, ObjectStreamSeparateFin) {
   // OBJECT can return on an unknown-length message even without receiving a
   // FIN.
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
+  parser.ProcessData(message->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.end_of_message_);
+
+  parser.ProcessData(absl::string_view(), true);  // send the FIN
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "");
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
+TEST_F(MoqtMessageSpecificTest, ObjectPreferDatagramSeparateFin) {
+  // OBJECT can return on an unknown-length message even without receiving a
+  // FIN.
+  MoqtParser parser(kRawQuic, visitor_);
+  auto message = std::make_unique<ObjectPreferDatagramMessage>();
   parser.ProcessData(message->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 1);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -451,7 +441,7 @@
 // message.
 TEST_F(MoqtMessageSpecificTest, ThreePartObject) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
   parser.ProcessData(message->PacketSample(), false);
   EXPECT_EQ(visitor_.messages_received_, 1);
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
@@ -480,7 +470,7 @@
 // Send the part of header, rest of header + payload, plus payload.
 TEST_F(MoqtMessageSpecificTest, ThreePartObjectFirstIncomplete) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithoutLength>();
+  auto message = std::make_unique<ObjectStreamMessage>();
 
   // first part
   parser.ProcessData(message->PacketSample().substr(0, 4), false);
@@ -495,7 +485,7 @@
   EXPECT_TRUE(message->EqualFieldValues(*visitor_.last_message_));
   EXPECT_FALSE(visitor_.end_of_message_);
   EXPECT_TRUE(visitor_.object_payload_.has_value());
-  EXPECT_EQ(visitor_.object_payload_->length(), 95);
+  EXPECT_EQ(visitor_.object_payload_->length(), 94);
 
   // third part includes FIN
   parser.ProcessData("bar", true);
@@ -507,6 +497,50 @@
   EXPECT_FALSE(visitor_.parsing_error_.has_value());
 }
 
+TEST_F(MoqtMessageSpecificTest, StreamHeaderGroupFollowOn) {
+  MoqtParser parser(kRawQuic, visitor_);
+  // first part
+  auto message1 = std::make_unique<StreamHeaderGroupMessage>();
+  parser.ProcessData(message1->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+  // second part
+  auto message2 = std::make_unique<StreamMiddlerGroupMessage>();
+  parser.ProcessData(message2->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "bar");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
+TEST_F(MoqtMessageSpecificTest, StreamHeaderTrackFollowOn) {
+  MoqtParser parser(kRawQuic, visitor_);
+  // first part
+  auto message1 = std::make_unique<StreamHeaderTrackMessage>();
+  parser.ProcessData(message1->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_TRUE(message1->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "foo");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+  // second part
+  auto message2 = std::make_unique<StreamMiddlerTrackMessage>();
+  parser.ProcessData(message2->PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 2);
+  EXPECT_TRUE(message2->EqualFieldValues(*visitor_.last_message_));
+  EXPECT_TRUE(visitor_.end_of_message_);
+  EXPECT_TRUE(visitor_.object_payload_.has_value());
+  EXPECT_EQ(*(visitor_.object_payload_), "bar");
+  EXPECT_FALSE(visitor_.parsing_error_.has_value());
+}
+
 TEST_F(MoqtMessageSpecificTest, SetupRoleAppearsTwice) {
   MoqtParser parser(kRawQuic, visitor_);
   char setup[] = {
@@ -619,11 +653,12 @@
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
 
-TEST_F(MoqtMessageSpecificTest, SubscribeRequestAuthorizationInfoTwice) {
+#ifdef MOQT_AUTH_INFO
+TEST_F(MoqtMessageSpecificTest, SubscribeAuthorizationInfoTwice) {
   MoqtParser parser(kWebTrans, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
+  char subscribe[] = {
+      0x03, 0x01, 0x02, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
+      0x04, 0x61, 0x62, 0x63, 0x64,              // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
@@ -632,14 +667,14 @@
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
             "AUTHORIZATION_INFO parameter appears twice in SUBSCRIBE_REQUEST");
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
 }
+#endif
 
 TEST_F(MoqtMessageSpecificTest, AnnounceAuthorizationInfoTwice) {
   MoqtParser parser(kWebTrans, visitor_);
@@ -659,11 +694,11 @@
 
 TEST_F(MoqtMessageSpecificTest, FinMidPayload) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithLength>();
+  auto message = std::make_unique<StreamHeaderGroupMessage>();
   parser.ProcessData(
       message->PacketSample().substr(0, message->total_message_size() - 1),
       true);
-  EXPECT_EQ(visitor_.messages_received_, 1);
+  EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_, "Received FIN mid-payload");
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
@@ -671,7 +706,7 @@
 
 TEST_F(MoqtMessageSpecificTest, PartialPayloadThenFin) {
   MoqtParser parser(kRawQuic, visitor_);
-  auto message = std::make_unique<ObjectMessageWithLength>();
+  auto message = std::make_unique<StreamHeaderTrackMessage>();
   parser.ProcessData(
       message->PacketSample().substr(0, message->total_message_size() - 1),
       false);
@@ -724,62 +759,68 @@
 
 TEST_F(MoqtMessageSpecificTest, StartGroupIsNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x00,                          // start_group = none
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
       0x00,                          // end_object = none
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "START_GROUP must not be None in SUBSCRIBE_REQUEST");
+            "START_GROUP must not be None in SUBSCRIBE");
 }
 
 TEST_F(MoqtMessageSpecificTest, StartObjectIsNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x00,                          // start_object = none
       0x00,                          // end_group = none
       0x00,                          // end_object = none
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "START_OBJECT must not be None in SUBSCRIBE_REQUEST");
+            "START_OBJECT must not be None in SUBSCRIBE");
 }
 
 TEST_F(MoqtMessageSpecificTest, EndGroupIsNoneEndObjectIsNoNone) {
   MoqtParser parser(kRawQuic, visitor_);
-  char subscribe_request[] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_name = "foo"
+  char subscribe[] = {
+      0x03, 0x01, 0x02,              // id and alias
+      0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x02, 0x04,                    // start_group = 4 (relative previous)
       0x01, 0x01,                    // start_object = 1 (absolute)
       0x00,                          // end_group = none
       0x01, 0x01,                    // end_object = 1 (absolute)
+#ifdef MOQT_AUTH_INFO
       0x01,                          // 1 parameter
       0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#endif
   };
-  parser.ProcessData(
-      absl::string_view(subscribe_request, sizeof(subscribe_request)), false);
+  parser.ProcessData(absl::string_view(subscribe, sizeof(subscribe)), false);
   EXPECT_EQ(visitor_.messages_received_, 0);
   EXPECT_TRUE(visitor_.parsing_error_.has_value());
   EXPECT_EQ(*visitor_.parsing_error_,
-            "SUBSCRIBE_REQUEST end_group and end_object must be both None "
+            "SUBSCRIBE end_group and end_object must be both None "
             "or both non_None");
 }
 
@@ -793,59 +834,11 @@
   for (MoqtMessageType type : message_types) {
     // Each iteration, process from the halfway point of one message to the
     // halfway point of the next.
-    if (type == MoqtMessageType::kObjectWithoutPayloadLength) {
-      continue;  // Cannot be followed with another message.
+    if (IsObjectMessage(type)) {
+      continue;  // Objects cannot share a stream with other meessages.
     }
-    std::unique_ptr<TestMessageBase> message;
-    switch (type) {
-      case MoqtMessageType::kObjectWithPayloadLength:
-        message = std::make_unique<ObjectMessageWithLength>();
-        break;
-      case MoqtMessageType::kObjectWithoutPayloadLength:
-        continue;  // Cannot be followed with another message;
-      case MoqtMessageType::kClientSetup:
-        message = std::make_unique<ClientSetupMessage>(kRawQuic);
-        break;
-      case MoqtMessageType::kServerSetup:
-        message = std::make_unique<ClientSetupMessage>(kRawQuic);
-        break;
-      case MoqtMessageType::kSubscribeRequest:
-        message = std::make_unique<SubscribeRequestMessage>();
-        break;
-      case MoqtMessageType::kSubscribeOk:
-        message = std::make_unique<SubscribeOkMessage>();
-        break;
-      case MoqtMessageType::kSubscribeError:
-        message = std::make_unique<SubscribeErrorMessage>();
-        break;
-      case MoqtMessageType::kUnsubscribe:
-        message = std::make_unique<UnsubscribeMessage>();
-        break;
-      case MoqtMessageType::kSubscribeFin:
-        message = std::make_unique<SubscribeFinMessage>();
-        break;
-      case MoqtMessageType::kSubscribeRst:
-        message = std::make_unique<SubscribeRstMessage>();
-        break;
-      case MoqtMessageType::kAnnounce:
-        message = std::make_unique<AnnounceMessage>();
-        break;
-      case moqt::MoqtMessageType::kAnnounceOk:
-        message = std::make_unique<AnnounceOkMessage>();
-        break;
-      case moqt::MoqtMessageType::kAnnounceError:
-        message = std::make_unique<AnnounceErrorMessage>();
-        break;
-      case moqt::MoqtMessageType::kUnannounce:
-        message = std::make_unique<UnannounceMessage>();
-        break;
-      case moqt::MoqtMessageType::kGoAway:
-        message = std::make_unique<GoAwayMessage>();
-        break;
-      default:
-        message = nullptr;
-        break;
-    }
+    std::unique_ptr<TestMessageBase> message =
+        CreateTestMessage(type, kRawQuic);
     memcpy(buffer + write, message->PacketSample().data(),
            message->total_message_size());
     size_t new_read = write + message->total_message_size() / 2;
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 7adb7ee..f78d3f5 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -12,6 +12,7 @@
 #include <vector>
 
 #include "absl/algorithm/container.h"
+#include "absl/status/status.h"
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
@@ -30,8 +31,6 @@
 
 using ::quic::Perspective;
 
-constexpr int kMaxBufferedObjects = 1000;
-
 void MoqtSession::OnSessionReady() {
   QUICHE_DLOG(INFO) << ENDPOINT << "Underlying session ready";
   if (parameters_.perspective == Perspective::IS_SERVER) {
@@ -108,8 +107,7 @@
 
 void MoqtSession::AddLocalTrack(const FullTrackName& full_track_name,
                                 LocalTrack::Visitor* visitor) {
-  local_tracks_.try_emplace(full_track_name, full_track_name,
-                            next_track_alias_++, visitor);
+  local_tracks_.try_emplace(full_track_name, full_track_name, visitor);
 }
 
 // TODO: Create state that allows ANNOUNCE_OK/ERROR on spurious namespaces to
@@ -144,7 +142,7 @@
                                     uint64_t start_group, uint64_t start_object,
                                     RemoteTrack::Visitor* visitor,
                                     absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(true, start_group);
@@ -171,7 +169,7 @@
     QUIC_DLOG(ERROR) << "Subscription end is before beginning";
     return false;
   }
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(true, start_group);
@@ -189,7 +187,7 @@
                                     int64_t start_object,
                                     RemoteTrack::Visitor* visitor,
                                     absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   message.start_group = MoqtSubscribeLocation(false, start_group);
@@ -206,7 +204,7 @@
                                         absl::string_view name,
                                         RemoteTrack::Visitor* visitor,
                                         absl::string_view auth_info) {
-  MoqtSubscribeRequest message;
+  MoqtSubscribe message;
   message.track_namespace = track_namespace;
   message.track_name = name;
   // First object of current group.
@@ -220,22 +218,31 @@
   return Subscribe(message, visitor);
 }
 
-bool MoqtSession::Subscribe(const MoqtSubscribeRequest& message,
+bool MoqtSession::Subscribe(MoqtSubscribe& message,
                             RemoteTrack::Visitor* visitor) {
   // TODO(martinduke): support authorization info
-  bool success =
-      session_->GetStreamById(*control_stream_)
-          ->Write(framer_.SerializeSubscribeRequest(message).AsStringView());
-  if (!success) {
-    Error(MoqtError::kGenericError,
-          "Failed to write SUBSCRIBE_REQUEST message");
-    return false;
-  }
-  QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE_REQUEST message for "
-                  << message.track_namespace << ":" << message.track_name;
+  message.subscribe_id = next_subscribe_id_++;
   FullTrackName ftn(std::string(message.track_namespace),
                     std::string(message.track_name));
-  remote_tracks_.try_emplace(ftn, ftn, visitor);
+  auto it = remote_track_aliases_.find(ftn);
+  if (it != remote_track_aliases_.end()) {
+    message.track_alias = it->second;
+    if (message.track_alias >= next_remote_track_alias_) {
+      next_remote_track_alias_ = message.track_alias + 1;
+    }
+  } else {
+    message.track_alias = next_remote_track_alias_++;
+  }
+  bool success =
+      session_->GetStreamById(*control_stream_)
+          ->Write(framer_.SerializeSubscribe(message).AsStringView());
+  if (!success) {
+    Error(MoqtError::kGenericError, "Failed to write SUBSCRIBE message");
+    return false;
+  }
+  QUIC_DLOG(INFO) << ENDPOINT << "Sent SUBSCRIBE message for "
+                  << message.track_namespace << ":" << message.track_name;
+  active_subscribes_.try_emplace(message.subscribe_id, message, visitor);
   return true;
 }
 
@@ -252,55 +259,87 @@
   return new_stream->GetStreamId();
 }
 
-// increment object_sequence or group_sequence depending on |start_new_group|
-void MoqtSession::PublishObjectToStream(webtransport::StreamId stream_id,
-                                        FullTrackName full_track_name,
-                                        bool start_new_group,
-                                        absl::string_view payload) {
-  // TODO: check that the peer is subscribed to the next sequence.
-  webtransport::Stream* stream = session_->GetStreamById(stream_id);
-  if (stream == nullptr) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream";
-    return;
+bool MoqtSession::PublishObject(FullTrackName& full_track_name,
+                                uint64_t group_id, uint64_t object_id,
+                                uint64_t object_send_order,
+                                MoqtForwardingPreference forwarding_preference,
+                                absl::string_view payload,
+                                std::optional<uint64_t> payload_length,
+                                bool end_of_stream) {
+  if (payload_length.has_value() && *payload_length < payload.length()) {
+    QUICHE_DLOG(ERROR) << ENDPOINT << "Payload too short";
+    return false;
   }
   auto track_it = local_tracks_.find(full_track_name);
   if (track_it == local_tracks_.end()) {
     QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT for nonexistent track";
-    return;
+    return false;
+  }
+  LocalTrack& track = track_it->second;
+  track.SentSequence(FullSequence(group_id, object_id));
+  std::vector<SubscribeWindow*> subscriptions =
+      track.ShouldSend({group_id, object_id});
+  if (subscriptions.empty()) {
+    return true;
   }
   MoqtObject object;
-  LocalTrack& track = track_it->second;
-  object.track_id = track.track_alias();
-  FullSequence& next_sequence = track.next_sequence_mutable();
-  object.group_sequence = next_sequence.group;
-  if (start_new_group) {
-    ++object.group_sequence;
-    object.object_sequence = 0;
-  } else {
-    object.object_sequence = next_sequence.object;
-  }
-  next_sequence.group = object.group_sequence;
-  next_sequence.object = object.object_sequence + 1;
-  if (!track.ShouldSend(object.group_sequence, object.object_sequence)) {
-    QUICHE_LOG(INFO) << ENDPOINT << "Not sending object "
+  QUICHE_DCHECK(track.track_alias().has_value());
+  object.track_alias = *track.track_alias();
+  object.group_id = group_id;
+  object.object_id = object_id;
+  object.object_send_order = object_send_order;
+  object.forwarding_preference = forwarding_preference;
+  object.payload_length = payload_length;
+  int failures = 0;
+  quiche::StreamWriteOptions write_options;
+  write_options.set_send_fin(end_of_stream);
+  for (auto subscription : subscriptions) {
+    // TODO: kPreferDatagram should bypass stream stuff. For now, send it on the
+    // stream.
+    bool new_stream = false;
+    std::optional<webtransport::StreamId> stream_id =
+        subscription->GetStreamForSequence({group_id, object_id},
+                                           forwarding_preference);
+    if (!stream_id.has_value()) {
+      new_stream = true;
+      stream_id = OpenUnidirectionalStream();
+      if (!stream_id.has_value()) {
+        QUICHE_DLOG(ERROR) << ENDPOINT
+                           << "Sending OBJECT to nonexistent stream";
+        ++failures;
+        continue;
+      }
+      if (!end_of_stream) {
+        subscription->AddStream(forwarding_preference, *stream_id, group_id,
+                                object_id);
+      }
+    }
+    webtransport::Stream* stream = session_->GetStreamById(*stream_id);
+    if (stream == nullptr) {
+      QUICHE_DLOG(ERROR) << ENDPOINT << "Sending OBJECT to nonexistent stream "
+                         << *stream_id;
+      ++failures;
+      continue;
+    }
+    object.subscribe_id = subscription->subscribe_id();
+    if (quiche::WriteIntoStream(
+            *stream,
+            framer_.SerializeObject(object, payload, new_stream).AsStringView(),
+            write_options) != absl::OkStatus()) {
+      QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
+      ++failures;
+      continue;
+    }
+    QUICHE_LOG(INFO) << ENDPOINT << "Sending object "
                      << full_track_name.track_namespace << ":"
                      << full_track_name.track_name << " with sequence "
-                     << object.group_sequence << ":" << object.object_sequence
-                     << " because peer is not subscribed";
-    return;
+                     << object.group_id << ":" << object.object_id
+                     << " on stream " << *stream_id;
+    if (end_of_stream && !new_stream) {
+      subscription->RemoveStream(forwarding_preference, group_id, object_id);
+    }
   }
-  object.object_send_order = 0;
-  object.payload_length = payload.size();
-  bool success =
-      stream->Write(framer_.SerializeObject(object, payload).AsStringView());
-  if (!success) {
-    QUICHE_DLOG(ERROR) << ENDPOINT << "Failed to write OBJECT message";
-    return;
-  }
-  QUICHE_LOG(INFO) << ENDPOINT << "Sending object "
-                   << full_track_name.track_namespace << ":"
-                   << full_track_name.track_name << " with sequence "
-                   << object.group_sequence << ":" << object.object_sequence;
+  return (failures == 0);
 }
 
 void MoqtSession::Stream::OnCanRead() {
@@ -338,15 +377,17 @@
                     "Received OBJECT message on control stream");
     return;
   }
-  QUICHE_DLOG(INFO) << ENDPOINT << "Received OBJECT message on stream "
-                    << stream_->GetStreamId() << " for track alias "
-                    << message.track_id << " with sequence "
-                    << message.group_sequence << ":" << message.object_sequence
-                    << " length " << payload.size() << " explicit length "
-                    << (message.payload_length.has_value()
-                            ? (int)*message.payload_length
-                            : -1)
-                    << (end_of_message ? "F" : "");
+  QUICHE_DLOG(INFO)
+      << ENDPOINT << "Received OBJECT message on stream "
+      << stream_->GetStreamId() << " for subscribe_id " << message.subscribe_id
+      << " for track alias " << message.track_alias << " with sequence "
+      << message.group_id << ":" << message.object_id << " send_order "
+      << message.object_send_order << " forwarding_preference "
+      << MoqtForwardingPreferenceToString(message.forwarding_preference)
+      << " length " << payload.size() << " explicit length "
+      << (message.payload_length.has_value() ? (int)*message.payload_length
+                                             : -1)
+      << (end_of_message ? "F" : "");
   if (!session_->parameters_.deliver_partial_objects) {
     if (!end_of_message) {  // Buffer partial object.
       absl::StrAppend(&partial_object_, payload);
@@ -357,33 +398,17 @@
       payload = absl::string_view(partial_object_);
     }
   }
-  auto it = session_->tracks_by_alias_.find(message.track_id);
-  if (it == session_->tracks_by_alias_.end()) {
-    // No SUBSCRIBE_OK received with this alias, buffer it.
-    auto it2 = session_->object_queue_.find(message.track_id);
-    std::vector<BufferedObject>* queue;
-    if (it2 == session_->object_queue_.end()) {
-      queue = &session_->object_queue_[message.track_id];
-    } else {
-      queue = &it2->second;
-    }
-    if (session_->num_buffered_objects_ >= kMaxBufferedObjects) {
-      session_->num_buffered_objects_++;
-      session_->Error(MoqtError::kGenericError, "Too many buffered objects");
-      return;
-    }
-    queue->push_back(BufferedObject(stream_->GetStreamId(), message, payload,
-                                    end_of_message));
-    QUIC_DLOG(INFO) << ENDPOINT << "Buffering OBJECT for track alias "
-                    << message.track_id;
+  auto it = session_->remote_tracks_.find(message.track_alias);
+  if (it == session_->remote_tracks_.end()) {
+    // No SUBSCRIBE_OK received with this alias, return.
     return;
   }
-  RemoteTrack* subscription = it->second;
-  if (subscription->visitor() != nullptr) {
-    subscription->visitor()->OnObjectFragment(
-        subscription->full_track_name(), stream_->GetStreamId(),
-        message.group_sequence, message.object_sequence,
-        message.object_send_order, payload, end_of_message);
+  RemoteTrack& subscription = it->second;
+  if (subscription.visitor() != nullptr) {
+    subscription.visitor()->OnObjectFragment(
+        subscription.full_track_name(), message.group_id, message.object_id,
+        message.object_send_order, message.forwarding_preference, payload,
+        end_of_message);
   }
   partial_object_.clear();
 }
@@ -456,14 +481,15 @@
   std::move(session_->session_established_callback_)();
 }
 
-void MoqtSession::Stream::SendSubscribeError(
-    const MoqtSubscribeRequest& message, SubscribeErrorCode error_code,
-    absl::string_view reason_phrase) {
+void MoqtSession::Stream::SendSubscribeError(const MoqtSubscribe& message,
+                                             SubscribeErrorCode error_code,
+                                             absl::string_view reason_phrase,
+                                             uint64_t track_alias) {
   MoqtSubscribeError subscribe_error;
-  subscribe_error.track_namespace = message.track_namespace;
-  subscribe_error.track_name = message.track_name;
+  subscribe_error.subscribe_id = message.subscribe_id;
   subscribe_error.error_code = error_code;
   subscribe_error.reason_phrase = reason_phrase;
+  subscribe_error.track_alias = track_alias;
   bool success =
       stream_->Write(session_->framer_.SerializeSubscribeError(subscribe_error)
                          .AsStringView());
@@ -473,13 +499,12 @@
   }
 }
 
-void MoqtSession::Stream::OnSubscribeRequestMessage(
-    const MoqtSubscribeRequest& message) {
+void MoqtSession::Stream::OnSubscribeMessage(const MoqtSubscribe& message) {
   std::string reason_phrase = "";
   if (!CheckIfIsControlStream()) {
     return;
   }
-  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE_REQUEST for "
+  QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.track_namespace << ":" << message.track_name;
   auto it = session_->local_tracks_.find(FullTrackName(
       std::string(message.track_namespace), std::string(message.track_name)));
@@ -488,32 +513,50 @@
                     << message.track_namespace << ":" << message.track_name
                     << " does not exist";
     SendSubscribeError(message, SubscribeErrorCode::kGenericError,
-                       "Track does not exist");
+                       "Track does not exist", message.track_alias);
     return;
   }
   LocalTrack& track = it->second;
+  if ((track.track_alias().has_value() &&
+       message.track_alias != *track.track_alias()) ||
+      session_->used_track_aliases_.contains(message.track_alias)) {
+    // Propose a different track_alias.
+    SendSubscribeError(message, SubscribeErrorCode::kRetryTrackAlias,
+                       "Track alias already exists",
+                       session_->next_local_track_alias_++);
+    return;
+  } else {  // Use client-provided alias.
+    track.set_track_alias(message.track_alias);
+    if (message.track_alias >= session_->next_local_track_alias_) {
+      session_->next_local_track_alias_ = message.track_alias + 1;
+    }
+    session_->used_track_aliases_.insert(message.track_alias);
+  }
   std::optional<FullSequence> start = session_->LocationToAbsoluteNumber(
       track, message.start_group, message.start_object);
   QUICHE_DCHECK(start.has_value());  // Parser enforces this.
   std::optional<FullSequence> end = session_->LocationToAbsoluteNumber(
       track, message.end_group, message.end_object);
   if (start < track.next_sequence() && track.visitor() != nullptr) {
-    SubscribeWindow window = end.has_value()
-                                 ? SubscribeWindow(start->group, start->object,
-                                                   end->group, end->object)
-                                 : SubscribeWindow(start->group, start->object);
+    // TODO: Rework this. It's not good that the session notifies the
+    // application -- presumably triggering the send of a bunch of objects --
+    // and only then sends the Subscribe OK.
+    SubscribeWindow window =
+        end.has_value()
+            ? SubscribeWindow(message.subscribe_id, start->group, start->object,
+                              end->group, end->object)
+            : SubscribeWindow(message.subscribe_id, start->group,
+                              start->object);
     std::optional<absl::string_view> past_objects_available =
-        track.visitor()->OnSubscribeRequestForPast(window);
-    if (!past_objects_available.has_value()) {
+        track.visitor()->OnSubscribeForPast(window);
+    if (past_objects_available.has_value()) {
       SendSubscribeError(message, SubscribeErrorCode::kGenericError,
-                         "Object does not exist");
+                         "Object does not exist", message.track_alias);
       return;
     }
   }
   MoqtSubscribeOk subscribe_ok;
-  subscribe_ok.track_namespace = message.track_namespace;
-  subscribe_ok.track_name = message.track_name;
-  subscribe_ok.track_id = track.track_alias();
+  subscribe_ok.subscribe_id = message.subscribe_id;
   bool success = stream_->Write(
       session_->framer_.SerializeSubscribeOk(subscribe_ok).AsStringView());
   if (!success) {
@@ -524,62 +567,39 @@
   QUIC_DLOG(INFO) << ENDPOINT << "Created subscription for "
                   << message.track_namespace << ":" << message.track_name;
   if (!end.has_value()) {
-    track.AddWindow(SubscribeWindow(start->group, start->object));
+    track.AddWindow(
+        SubscribeWindow(message.subscribe_id, start->group, start->object));
     return;
   }
-  track.AddWindow(
-      SubscribeWindow(start->group, start->object, end->group, end->object));
+  track.AddWindow(SubscribeWindow(message.subscribe_id, start->group,
+                                  start->object, end->group, end->object));
 }
 
 void MoqtSession::Stream::OnSubscribeOkMessage(const MoqtSubscribeOk& message) {
   if (!CheckIfIsControlStream()) {
     return;
   }
-  if (session_->tracks_by_alias_.contains(message.track_id)) {
-    session_->Error(MoqtError::kDuplicateTrackAlias,
-                    "Received duplicate track_alias");
-    return;
-  }
-  auto it = session_->remote_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->remote_tracks_.end()) {
+  auto it = session_->active_subscribes_.find(message.subscribe_id);
+  if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SUBSCRIBE_OK for nonexistent subscribe");
     return;
   }
-  // Note that if there are multiple SUBSCRIBE_OK for the same track,
-  // RemoteTrack.track_alias() will be the last alias received, but
-  // tracks_by_alias_ will have an entry for every track_alias received.
-  // TODO: revise this data structure to make it easier to clean up
-  // RemoteTracks, unless draft changes make it irrelevant.
+  MoqtSubscribe& subscribe = it->second.message;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                  << message.track_namespace << ":" << message.track_name
-                  << ", track_alias = " << message.track_id;
-  RemoteTrack& track = it->second;
-  track.set_track_alias(message.track_id);
-  session_->tracks_by_alias_[message.track_id] = &track;
+                  << "subscribe_id = " << message.subscribe_id
+                  << subscribe.track_namespace << ":" << subscribe.track_name;
+  // Copy the Remote Track from session_->active_subscribes_ to
+  // session_->remote_tracks_.
+  FullTrackName ftn(subscribe.track_namespace, subscribe.track_name);
+  RemoteTrack::Visitor* visitor = it->second.visitor;
+  session_->remote_tracks_.try_emplace(subscribe.track_alias, ftn,
+                                       subscribe.track_alias, visitor);
   // TODO: handle expires.
-  if (track.visitor() != nullptr) {
-    track.visitor()->OnReply(track.full_track_name(), std::nullopt);
+  if (visitor != nullptr) {
+    visitor->OnReply(ftn, std::nullopt);
   }
-  // Clear the buffer for this track alias.
-  auto it2 = session_->object_queue_.find(message.track_id);
-  if (it2 == session_->object_queue_.end() || track.visitor() == nullptr) {
-    // Nothing is buffered, or the app hasn't registered a visitor anyway.
-    return;
-  }
-  QUIC_DLOG(INFO) << ENDPOINT << "Processing buffered OBJECTs for track_alias "
-                  << message.track_id;
-  std::vector<BufferedObject>& queue = it2->second;
-  for (BufferedObject& to_deliver : queue) {
-    track.visitor()->OnObjectFragment(
-        track.full_track_name(), to_deliver.stream_id,
-        to_deliver.message.group_sequence, to_deliver.message.object_sequence,
-        to_deliver.message.object_send_order, to_deliver.payload,
-        to_deliver.eom);
-    session_->num_buffered_objects_--;
-  }
-  session_->object_queue_.erase(it2);
+  session_->active_subscribes_.erase(it);
 }
 
 void MoqtSession::Stream::OnSubscribeErrorMessage(
@@ -587,21 +607,28 @@
   if (!CheckIfIsControlStream()) {
     return;
   }
-  auto it = session_->remote_tracks_.find(FullTrackName(
-      std::string(message.track_namespace), std::string(message.track_name)));
-  if (it == session_->remote_tracks_.end()) {
+  auto it = session_->active_subscribes_.find(message.subscribe_id);
+  if (it == session_->active_subscribes_.end()) {
     session_->Error(MoqtError::kProtocolViolation,
                     "Received SUBSCRIBE_ERROR for nonexistent subscribe");
     return;
   }
+  MoqtSubscribe& subscribe = it->second.message;
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
-                  << message.track_namespace << ":" << message.track_name
-                  << ", error = " << static_cast<int>(message.error_code)
+                  << "subscribe_id = " << message.subscribe_id << " ("
+                  << subscribe.track_namespace << ":" << subscribe.track_name
+                  << ")" << ", error = " << static_cast<int>(message.error_code)
                   << " (" << message.reason_phrase << ")";
-  if (it->second.visitor() != nullptr) {
-    it->second.visitor()->OnReply(it->second.full_track_name(),
-                                  message.reason_phrase);
+  RemoteTrack::Visitor* visitor = it->second.visitor;
+  FullTrackName ftn(subscribe.track_namespace, subscribe.track_name);
+  if (message.error_code == SubscribeErrorCode::kRetryTrackAlias) {
+    // Automatically resubscribe with new alias.
+    session_->remote_track_aliases_[ftn] = message.track_alias;
+    session_->Subscribe(subscribe, visitor);
+  } else if (visitor != nullptr) {
+    visitor->OnReply(ftn, message.reason_phrase);
   }
+  session_->active_subscribes_.erase(it);
 }
 
 void MoqtSession::Stream::OnAnnounceMessage(const MoqtAnnounce& message) {
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 01f6fcf..9b77a11 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -9,10 +9,9 @@
 #include <optional>
 #include <string>
 #include <utility>
-#include <vector>
 
 #include "absl/container/flat_hash_map.h"
-#include "absl/container/node_hash_map.h"
+#include "absl/container/flat_hash_set.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/moqt/moqt_framer.h"
@@ -79,8 +78,8 @@
 
   // Add to the list of tracks that can be subscribed to. Call this before
   // Announce() so that subscriptions can be processed correctly. If |visitor|
-  // is nullptr, then incoming SUBSCRIBE_REQUEST for objects in the path will
-  // receive SUBSCRIBE_OK, but never actually get the objects.
+  // is nullptr, then incoming SUBSCRIBE for objects in the path will receive
+  // SUBSCRIBE_OK, but never actually get the objects.
   void AddLocalTrack(const FullTrackName& full_track_name,
                      LocalTrack::Visitor* visitor);
   // Send an ANNOUNCE message for |track_namespace|, and call
@@ -90,9 +89,9 @@
                 MoqtAnnounceCallback announce_callback);
   bool HasSubscribers(const FullTrackName& full_track_name) const;
 
-  // Returns true if SUBSCRIBE_REQUEST was sent. If there is already a
-  // subscription to the track, the message will still be sent. However, the
-  // visitor will be ignored.
+  // Returns true if SUBSCRIBE was sent. If there is already a subscription to
+  // the track, the message will still be sent. However, the visitor will be
+  // ignored.
   bool SubscribeAbsolute(absl::string_view track_namespace,
                          absl::string_view name, uint64_t start_group,
                          uint64_t start_object, RemoteTrack::Visitor* visitor,
@@ -111,15 +110,20 @@
                              RemoteTrack::Visitor* visitor,
                              absl::string_view auth_info = "");
 
-  // Returns the stream ID if successful, nullopt if not.
-  // TODO: Add a callback if stream creation is delayed.
-  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
-  // Will automatically assign a new sequence number. If |start_new_group|,
-  // increment group_sequence and set object_sequence to 0. Otherwise,
-  // increment object_sequence.
-  void PublishObjectToStream(webtransport::StreamId stream_id,
-                             FullTrackName full_track_name,
-                             bool start_new_group, absl::string_view payload);
+  // Returns false if it could not open a stream when necessary, or if the
+  // track does not exist (there was no call to AddLocalTrack). Will still
+  // return false is some streams succeed.
+  // Also returns false if |payload_length| exists but is shorter than
+  // |payload|.
+  // |payload.length() >= |payload_length|, because the application can deliver
+  // partial objects.
+  bool PublishObject(FullTrackName& full_track_name, uint64_t group_id,
+                     uint64_t object_id, uint64_t object_send_order,
+                     MoqtForwardingPreference forwarding_preference,
+                     absl::string_view payload,
+                     std::optional<uint64_t> payload_length,
+                     bool end_of_stream);
+  // TODO: Add an API to FIN the stream for a particular track/group/object.
 
  private:
   friend class test::MoqtSessionPeer;
@@ -145,12 +149,12 @@
     void OnWriteSideInDataRecvdState() override {}
 
     // MoqtParserVisitor implementation.
+    // TODO: Handle a stream FIN.
     void OnObjectMessage(const MoqtObject& message, absl::string_view payload,
                          bool end_of_message) override;
     void OnClientSetupMessage(const MoqtClientSetup& message) override;
     void OnServerSetupMessage(const MoqtServerSetup& message) override;
-    void OnSubscribeRequestMessage(
-        const MoqtSubscribeRequest& message) override;
+    void OnSubscribeMessage(const MoqtSubscribe& message) override;
     void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override;
     void OnSubscribeErrorMessage(const MoqtSubscribeError& message) override;
     void OnUnsubscribeMessage(const MoqtUnsubscribe& /*message*/) override {}
@@ -172,9 +176,10 @@
 
    private:
     friend class test::MoqtSessionPeer;
-    void SendSubscribeError(const MoqtSubscribeRequest& message,
+    void SendSubscribeError(const MoqtSubscribe& message,
                             SubscribeErrorCode error_code,
-                            absl::string_view reason_phrase);
+                            absl::string_view reason_phrase,
+                            uint64_t track_alias);
     bool CheckIfIsControlStream();
 
     MoqtSession* session_;
@@ -186,30 +191,16 @@
     std::string partial_object_;
   };
 
-  // If parameters_.deliver_partial_objects is false, then the session buffers
-  // these objects until they arrive in their entirety. This stores the
-  // relevant information to later deliver this object via OnObject().
-  struct BufferedObject {
-    uint32_t stream_id;
-    MoqtObject message;
-    std::string payload;
-    bool eom;
-    BufferedObject(uint32_t id, const MoqtObject& header,
-                   absl::string_view body, bool end_of_message)
-        : stream_id(id),
-          message(header),
-          payload(std::string(body)),
-          eom(end_of_message) {}
-  };
-
-  // Returns false if the SUBSCRIBE_REQUEST isn't sent.
-  bool Subscribe(const MoqtSubscribeRequest& message,
-                 RemoteTrack::Visitor* visitor);
+  // Returns false if the SUBSCRIBE isn't sent.
+  bool Subscribe(MoqtSubscribe& message, RemoteTrack::Visitor* visitor);
   // converts two MoqtLocations into absolute sequences.
   std::optional<FullSequence> LocationToAbsoluteNumber(
       const LocalTrack& track,
       const std::optional<MoqtSubscribeLocation>& group,
       const std::optional<MoqtSubscribeLocation>& object);
+  // Returns the stream ID if successful, nullopt if not.
+  // TODO: Add a callback if stream creation is delayed.
+  std::optional<webtransport::StreamId> OpenUnidirectionalStream();
 
   webtransport::Session* session_;
   MoqtSessionParameters parameters_;
@@ -221,18 +212,27 @@
   std::optional<webtransport::StreamId> control_stream_;
   std::string error_;
 
-  // All the tracks the session is subscribed to. Multiple subscribes to the
-  // same track are recorded in a single subscription.
-  absl::node_hash_map<FullTrackName, RemoteTrack> remote_tracks_;
+  // All the tracks the session is subscribed to, indexed by track_alias.
+  // Multiple subscribes to the same track are recorded in a single
+  // subscription.
+  absl::flat_hash_map<uint64_t, RemoteTrack> remote_tracks_;
+  // Look up aliases for remote tracks by name
+  absl::flat_hash_map<FullTrackName, uint64_t> remote_track_aliases_;
+  uint64_t next_remote_track_alias_ = 0;
+
   // All the tracks the peer can subscribe to.
   absl::flat_hash_map<FullTrackName, LocalTrack> local_tracks_;
+  // This is only used to check for track_alias collisions.
+  absl::flat_hash_set<uint64_t> used_track_aliases_;
+  uint64_t next_local_track_alias_ = 0;
 
-  // Remote tracks indexed by TrackId. Must be active.
-  absl::flat_hash_map<uint64_t, RemoteTrack*> tracks_by_alias_;
-  uint64_t next_track_alias_ = 0;
-  // Buffer for OBJECTs that arrive with an unknown track alias.
-  absl::flat_hash_map<uint64_t, std::vector<BufferedObject>> object_queue_;
-  int num_buffered_objects_ = 0;
+  // Indexed by subscribe_id.
+  struct ActiveSubscribe {
+    MoqtSubscribe message;
+    RemoteTrack::Visitor* visitor;
+  };
+  absl::flat_hash_map<uint64_t, ActiveSubscribe> active_subscribes_;
+  uint64_t next_subscribe_id_ = 0;
 
   // Indexed by track namespace.
   absl::flat_hash_map<std::string, MoqtAnnounceCallback>
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index eb231b2..69021e5 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -5,6 +5,7 @@
 #include "quiche/quic/moqt/moqt_session.h"
 
 #include <cstdint>
+#include <cstring>
 #include <memory>
 #include <optional>
 #include <string>
@@ -41,7 +42,7 @@
 constexpr webtransport::StreamId kOutgoingUniStreamId = 14;
 
 constexpr MoqtSessionParameters default_parameters = {
-    /*version=*/MoqtVersion::kDraft01,
+    /*version=*/MoqtVersion::kDraft02,
     /*perspective=*/quic::Perspective::IS_CLIENT,
     /*using_webtrans=*/true,
     /*path=*/std::string(),
@@ -90,25 +91,30 @@
     return (MoqtSession::Stream*)visitor;
   }
 
-  static void CreateRemoteTrack(MoqtSession* session, FullTrackName& name,
-                                RemoteTrack::Visitor* visitor) {
-    session->remote_tracks_.try_emplace(name, name, visitor);
+  static void CreateRemoteTrack(MoqtSession* session, const FullTrackName& name,
+                                RemoteTrack::Visitor* visitor,
+                                uint64_t track_alias) {
+    session->remote_tracks_.try_emplace(track_alias, name, track_alias,
+                                        visitor);
+    session->remote_track_aliases_.try_emplace(name, track_alias);
   }
 
-  static void CreateRemoteTrackWithAlias(MoqtSession* session,
-                                         FullTrackName& name,
-                                         RemoteTrack::Visitor* visitor,
-                                         uint64_t track_alias) {
-    auto it = session->remote_tracks_.try_emplace(name, name, visitor);
-    RemoteTrack& track = it.first->second;
+  static void AddSubscription(MoqtSession* session, FullTrackName& name,
+                              uint64_t subscribe_id, uint64_t track_alias,
+                              uint64_t start_group, uint64_t start_object) {
+    auto it = session->local_tracks_.find(name);
+    ASSERT_NE(it, session->local_tracks_.end());
+    LocalTrack& track = it->second;
     track.set_track_alias(track_alias);
-    session->tracks_by_alias_.emplace(std::make_pair(track_alias, &track));
+    track.AddWindow(SubscribeWindow(subscribe_id, start_group, start_object));
+    session->used_track_aliases_.emplace(track_alias);
   }
 
-  static LocalTrack& GetLocalTrack(MoqtSession* session, FullTrackName& name) {
+  static FullSequence next_sequence(MoqtSession* session, FullTrackName& name) {
     auto it = session->local_tracks_.find(name);
     EXPECT_NE(it, session->local_tracks_.end());
-    return it->second;
+    LocalTrack& track = it->second;
+    return track.next_sequence();
   }
 };
 
@@ -117,6 +123,9 @@
   MoqtSessionTest()
       : session_(&mock_session_, default_parameters,
                  session_callbacks_.AsSessionCallbacks()) {}
+  ~MoqtSessionTest() {
+    EXPECT_CALL(session_callbacks_.session_deleted_callback, Call());
+  }
 
   MockSessionCallbacks session_callbacks_;
   StrictMock<webtransport::test::MockSession> mock_session_;
@@ -157,7 +166,7 @@
           &session_, visitor.get());
   // Handle the server setup
   MoqtServerSetup setup = {
-      MoqtVersion::kDraft01,
+      MoqtVersion::kDraft02,
       MoqtRole::kBoth,
   };
   EXPECT_CALL(session_callbacks_.session_established_callback, Call()).Times(1);
@@ -166,7 +175,7 @@
 
 TEST_F(MoqtSessionTest, OnClientSetup) {
   MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_SERVER,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -178,7 +187,7 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
   MoqtClientSetup setup = {
-      /*supported_versions*/ {MoqtVersion::kDraft01},
+      /*supported_versions=*/{MoqtVersion::kDraft02},
       /*role=*/MoqtRole::kBoth,
       /*path=*/std::nullopt,
   };
@@ -249,14 +258,18 @@
 }
 
 TEST_F(MoqtSessionTest, AddLocalTrack) {
-  MoqtSubscribeRequest request = {
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"bar",
       /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/std::nullopt,
+#endif
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -271,7 +284,7 @@
                   MoqtMessageType::kSubscribeError);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
 
   // Add the track. Now Subscribe should succeed.
@@ -285,7 +298,7 @@
         EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
 }
 
@@ -365,14 +378,18 @@
   EXPECT_FALSE(session_.HasSubscribers(ftn));
 
   // Peer subscribes.
-  MoqtSubscribeRequest request = {
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"bar",
       /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/std::nullopt,
+#endif
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -385,11 +402,50 @@
         EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
         return absl::OkStatus();
       });
-  stream_input->OnSubscribeRequestMessage(request);
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
   EXPECT_TRUE(session_.HasSubscribers(ftn));
 }
 
+TEST_F(MoqtSessionTest, SubscribeForPast) {
+  MockLocalTrackVisitor local_track_visitor;
+  FullTrackName ftn("foo", "bar");
+  session_.AddLocalTrack(ftn, &local_track_visitor);
+
+  // Send Sequence (2, 0) so that next_sequence is set correctly.
+  session_.PublishObject(ftn, 2, 0, 0, MoqtForwardingPreference::kObject, "foo",
+                         std::nullopt, true);
+  // Peer subscribes to (0, 0)
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
+      /*authorization_info=*/std::nullopt,
+#endif
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  bool correct_message = true;
+  EXPECT_CALL(local_track_visitor, OnSubscribeForPast(_))
+      .WillOnce(Return(std::nullopt));
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = true;
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribeOk);
+        return absl::OkStatus();
+      });
+  stream_input->OnSubscribeMessage(request);
+  EXPECT_TRUE(correct_message);
+}
+
 TEST_F(MoqtSessionTest, SubscribeWithOk) {
   StrictMock<webtransport::test::MockStream> mock_stream;
   std::unique_ptr<MoqtParserVisitor> stream_input =
@@ -401,16 +457,13 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeRequest);
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribe);
         return absl::OkStatus();
       });
   session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
 
   MoqtSubscribeOk ok = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*track_id=*/0,
+      /*subscribe_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
   };
   correct_message = false;
@@ -436,17 +489,16 @@
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
-        EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kSubscribeRequest);
+        EXPECT_EQ(*ExtractMessageType(data[0]), MoqtMessageType::kSubscribe);
         return absl::OkStatus();
       });
   session_.SubscribeCurrentGroup("foo", "bar", &remote_track_visitor, "");
 
   MoqtSubscribeError error = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/0,
       /*error_code=*/SubscribeErrorCode::kInvalidRange,
       /*reason_phrase=*/"deadbeef",
+      /*track_alias=*/2,
   };
   correct_message = false;
   EXPECT_CALL(remote_track_visitor, OnReply(_, _))
@@ -483,12 +535,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/8,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -505,12 +559,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session_, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -526,7 +582,7 @@
 
 TEST_F(MoqtSessionTest, IncomingPartialObjectNoBuffer) {
   MoqtSessionParameters parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_CLIENT,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -537,12 +593,14 @@
   MockRemoteTrackVisitor visitor_;
   FullTrackName ftn("foo", "bar");
   std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrackWithAlias(&session, ftn, &visitor_, 0);
+  MoqtSessionPeer::CreateRemoteTrack(&session, ftn, &visitor_, 2);
   MoqtObject object = {
-      /*track_id=*/0,
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*group_sequence=*/0,
       /*object_sequence=*/0,
       /*object_send_order=*/0,
+      /*forwarding_preference=*/MoqtForwardingPreference::kGroup,
       /*payload_length=*/16,
   };
   StrictMock<webtransport::test::MockStream> mock_stream;
@@ -556,45 +614,20 @@
   object_stream->OnObjectMessage(object, payload, true);  // complete the object
 }
 
-TEST_F(MoqtSessionTest, IncomingObjectUnknownTrackId) {
-  MockRemoteTrackVisitor visitor_;
-  FullTrackName ftn("foo", "bar");
-  std::string payload = "deadbeef";
-  MoqtSessionPeer::CreateRemoteTrack(&session_, ftn, &visitor_);
-  MoqtObject object = {
-      /*track_id=*/0,
-      /*group_sequence=*/0,
-      /*object_sequence=*/0,
-      /*object_send_order=*/0,
-      /*payload_length=*/8,
-  };
-  StrictMock<webtransport::test::MockStream> mock_stream;
-  std::unique_ptr<MoqtParserVisitor> object_stream =
-      MoqtSessionPeer::CreateUniStream(&session_, &mock_stream);
-
-  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(0);
-  EXPECT_CALL(mock_stream, GetStreamId())
-      .WillRepeatedly(Return(kIncomingUniStreamId));
-  object_stream->OnObjectMessage(object, payload, true);
-  // Packet should be buffered.
-
-  // SUBSCRIBE_OK arrives
-  MoqtSubscribeOk ok = {
-      /*track_namespace=*/ftn.track_namespace,
-      /*track_name=*/ftn.track_name,
-      /*track_id=*/0,
-      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
-  };
-  StrictMock<webtransport::test::MockStream> mock_control_stream;
-  std::unique_ptr<MoqtParserVisitor> control_stream =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_control_stream);
-  EXPECT_CALL(visitor_, OnReply(_, _)).Times(1);
-  EXPECT_CALL(visitor_, OnObjectFragment(_, _, _, _, _, _, _)).Times(1);
-  control_stream->OnSubscribeOkMessage(ok);
-}
-
 TEST_F(MoqtSessionTest, CreateUniStreamAndSend) {
   StrictMock<webtransport::test::MockStream> mock_stream;
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor track_visitor;
+  session_.AddLocalTrack(ftn, &track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+  // No subscription; this is a no-op except to update next_sequence.
+  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
+  session_.PublishObject(ftn, 4, 1, 0, MoqtForwardingPreference::kObject,
+                         "deadbeef", std::nullopt, true);
+  EXPECT_EQ(MoqtSessionPeer::next_sequence(&session_, ftn), FullSequence(4, 2));
+
+  // Publish in window.
   EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
       .WillOnce(Return(true));
   EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
@@ -602,79 +635,96 @@
   EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
   EXPECT_CALL(mock_stream, GetStreamId())
       .WillRepeatedly(Return(kOutgoingUniStreamId));
-  std::optional<webtransport::StreamId> stream =
-      session_.OpenUnidirectionalStream();
-  EXPECT_TRUE(stream.has_value());
-  EXPECT_EQ(stream.value(), kOutgoingUniStreamId);
-
   // Send on the stream
   EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
       .WillOnce(Return(&mock_stream));
+  bool correct_message = false;
+  // Verify first six message fields are sent correctly
+  uint8_t kExpectedMessage[] = {0x00, 0x00, 0x02, 0x05, 0x00, 0x00};
+  EXPECT_CALL(mock_stream, Writev(_, _))
+      .WillOnce([&](absl::Span<const absl::string_view> data,
+                    const quiche::StreamWriteOptions& options) {
+        correct_message = (0 == memcmp(data.data()->data(), kExpectedMessage,
+                                       sizeof(kExpectedMessage)));
+        return absl::OkStatus();
+      });
+  session_.PublishObject(ftn, 5, 0, 0, MoqtForwardingPreference::kObject,
+                         "deadbeef", std::nullopt, true);
+  EXPECT_TRUE(correct_message);
+}
+
+// TODO: Test operation with multiple streams.
+
+// Error cases
+
+TEST_F(MoqtSessionTest, CannotOpenUniStream) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
   FullTrackName ftn("foo", "bar");
   MockLocalTrackVisitor track_visitor;
   session_.AddLocalTrack(ftn, &track_visitor);
-  LocalTrack& track = MoqtSessionPeer::GetLocalTrack(&session_, ftn);
-  FullSequence& next_seq = track.next_sequence_mutable();
-  next_seq.group = 4;
-  next_seq.object = 1;
-  track.AddWindow(SubscribeWindow(5, 0));
-  // No subscription; this is a no-op except for incrementing the sequence
-  // number.
-  EXPECT_CALL(mock_stream, Writev(_, _)).Times(0);
-  session_.PublishObjectToStream(kOutgoingUniStreamId,
-                                 FullTrackName("foo", "bar"),
-                                 /*start_new_group=*/false, "deadbeef");
-  EXPECT_EQ(next_seq, FullSequence(4, 2));
-  bool correct_message = false;
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  ;
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false));
+  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0,
+                                      MoqtForwardingPreference::kObject,
+                                      "deadbeef", std::nullopt, true));
+}
+
+TEST_F(MoqtSessionTest, GetStreamByIdFails) {
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  FullTrackName ftn("foo", "bar");
+  MockLocalTrackVisitor track_visitor;
+  session_.AddLocalTrack(ftn, &track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(true));
+  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
       .WillOnce(Return(&mock_stream));
+  EXPECT_CALL(mock_stream, SetVisitor(_)).Times(1);
+  EXPECT_CALL(mock_stream, GetStreamId())
+      .WillRepeatedly(Return(kOutgoingUniStreamId));
+  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
+      .WillOnce(Return(nullptr));
+  EXPECT_FALSE(session_.PublishObject(ftn, 5, 0, 0,
+                                      MoqtForwardingPreference::kObject,
+                                      "deadbeef", std::nullopt, true));
+}
+
+TEST_F(MoqtSessionTest, SubscribeProposesBadTrackAlias) {
+  MockLocalTrackVisitor local_track_visitor;
+  FullTrackName ftn("foo", "bar");
+  session_.AddLocalTrack(ftn, &local_track_visitor);
+  MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0);
+
+  // Peer subscribes.
+  MoqtSubscribe request = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/3,  // Doesn't match 2.
+      /*track_namespace=*/"foo",
+      /*track_name=*/"bar",
+      /*start_group=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*start_object=*/MoqtSubscribeLocation(true, static_cast<uint64_t>(0)),
+      /*end_group=*/std::nullopt,
+      /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
+      /*authorization_info=*/std::nullopt,
+#endif
+  };
+  StrictMock<webtransport::test::MockStream> mock_stream;
+  std::unique_ptr<MoqtParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  bool correct_message = true;
   EXPECT_CALL(mock_stream, Writev(_, _))
       .WillOnce([&](absl::Span<const absl::string_view> data,
                     const quiche::StreamWriteOptions& options) {
         correct_message = true;
         EXPECT_EQ(*ExtractMessageType(data[0]),
-                  MoqtMessageType::kObjectWithPayloadLength);
+                  MoqtMessageType::kSubscribeError);
         return absl::OkStatus();
       });
-  session_.PublishObjectToStream(kOutgoingUniStreamId,
-                                 FullTrackName("foo", "bar"),
-                                 /*start_new_group=*/true, "deadbeef");
+  stream_input->OnSubscribeMessage(request);
   EXPECT_TRUE(correct_message);
-  EXPECT_EQ(next_seq, FullSequence(5, 1));
-}
-
-// Error cases
-
-TEST_F(MoqtSessionTest, CannotOpenUniStream) {
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(false));
-  std::optional<webtransport::StreamId> stream =
-      session_.OpenUnidirectionalStream();
-  EXPECT_FALSE(stream.has_value());
-
-  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
-      .WillOnce(Return(true));
-  EXPECT_CALL(mock_session_, OpenOutgoingUnidirectionalStream())
-      .WillOnce(Return(nullptr));
-  stream = session_.OpenUnidirectionalStream();
-  EXPECT_FALSE(stream.has_value());
-}
-
-TEST_F(MoqtSessionTest, CannotPublishToStream) {
-  EXPECT_CALL(mock_session_, GetStreamById(kOutgoingUniStreamId))
-      .WillOnce(Return(nullptr));
-  FullTrackName ftn("foo", "bar");
-  MockLocalTrackVisitor track_visitor;
-  session_.AddLocalTrack(ftn, &track_visitor);
-  LocalTrack& track = MoqtSessionPeer::GetLocalTrack(&session_, ftn);
-  FullSequence& next_seq = track.next_sequence_mutable();
-  next_seq.group = 4;
-  next_seq.object = 1;
-  session_.PublishObjectToStream(kOutgoingUniStreamId, ftn,
-                                 /*start_new_group=*/false, "deadbeef");
-  // Object not sent; no change in sequence number.
-  EXPECT_EQ(next_seq.group, 4);
-  EXPECT_EQ(next_seq.object, 1);
 }
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamClient) {
@@ -718,7 +768,7 @@
 
 TEST_F(MoqtSessionTest, OneBidirectionalStreamServer) {
   MoqtSessionParameters server_parameters = {
-      /*version=*/MoqtVersion::kDraft01,
+      /*version=*/MoqtVersion::kDraft02,
       /*perspective=*/quic::Perspective::IS_SERVER,
       /*using_webtrans=*/true,
       /*path=*/"",
@@ -730,7 +780,7 @@
   std::unique_ptr<MoqtParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&server_session, &mock_stream);
   MoqtClientSetup setup = {
-      /*supported_versions*/ {MoqtVersion::kDraft01},
+      /*supported_versions*/ {MoqtVersion::kDraft02},
       /*role=*/MoqtRole::kBoth,
       /*path=*/std::nullopt,
   };
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.cc b/quiche/quic/moqt/moqt_subscribe_windows.cc
new file mode 100644
index 0000000..335d2d8
--- /dev/null
+++ b/quiche/quic/moqt/moqt_subscribe_windows.cc
@@ -0,0 +1,107 @@
+// Copyright 2024 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "quiche/quic/moqt/moqt_subscribe_windows.h"
+
+#include <cstdint>
+#include <optional>
+#include <vector>
+
+#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/platform/api/quic_bug_tracker.h"
+#include "quiche/web_transport/web_transport.h"
+
+namespace moqt {
+
+bool SubscribeWindow::InWindow(const FullSequence& seq) const {
+  if (seq < start_) {
+    return false;
+  }
+  return (!end_.has_value() || seq <= *end_);
+}
+
+std::optional<webtransport::StreamId> SubscribeWindow::GetStreamForSequence(
+    FullSequence sequence,
+    MoqtForwardingPreference forwarding_preference) const {
+  if (forwarding_preference == MoqtForwardingPreference::kTrack) {
+    return track_stream_;
+  }
+  auto group_it = group_streams_.find(sequence.group);
+  if (group_it == group_streams_.end()) {
+    return std::nullopt;
+  }
+  if (forwarding_preference == MoqtForwardingPreference::kGroup) {
+    return group_it->second.group_stream;
+  }
+  auto object_it = group_it->second.object_streams.find(sequence.object);
+  if (object_it == group_it->second.object_streams.end()) {
+    return std::nullopt;
+  }
+  return object_it->second;
+}
+
+void SubscribeWindow::AddStream(MoqtForwardingPreference forwarding_preference,
+                                uint64_t group_id, uint64_t object_id,
+                                webtransport::StreamId stream_id) {
+  if (forwarding_preference == MoqtForwardingPreference::kTrack) {
+    QUIC_BUG_IF(quic_bug_moqt_draft_02_01, track_stream_.has_value())
+        << "Track stream already assigned";
+    track_stream_ = stream_id;
+    return;
+  }
+  if (forwarding_preference == MoqtForwardingPreference::kGroup) {
+    QUIC_BUG_IF(quic_bug_moqt_draft_02_02,
+                group_streams_[group_id].group_stream.has_value())
+        << "Group stream already assigned";
+    group_streams_[group_id].group_stream = stream_id;
+    return;
+  }
+  // ObjectStream or ObjectPreferDatagram
+  QUIC_BUG_IF(quic_bug_moqt_draft_02_03,
+              group_streams_[group_id].object_streams.contains(object_id))
+      << "Object stream already assigned";
+  group_streams_[group_id].object_streams[object_id] = stream_id;
+}
+
+void SubscribeWindow::RemoveStream(
+    MoqtForwardingPreference forwarding_preference, uint64_t group_id,
+    uint64_t object_id) {
+  if (forwarding_preference == moqt::MoqtForwardingPreference::kTrack) {
+    track_stream_ = std::nullopt;
+    return;
+  }
+  auto group_it = group_streams_.find(group_id);
+  if (group_it == group_streams_.end()) {
+    return;
+  }
+  GroupStreams& group = group_it->second;
+  if (forwarding_preference == moqt::MoqtForwardingPreference::kGroup) {
+    group.group_stream = std::nullopt;
+    if (group.object_streams.empty()) {
+      group_streams_.erase(group_id);
+    }
+    return;
+  }
+  // ObjectStream or ObjectPreferDatagram
+  if (group.object_streams.contains(object_id)) {
+    group_streams_[group_id].object_streams.erase(object_id);
+    if (!group.group_stream.has_value() &&
+        group_streams_[group_id].object_streams.empty()) {
+      group_streams_.erase(group_id);
+    }
+  }
+}
+
+std::vector<SubscribeWindow*> MoqtSubscribeWindows::SequenceIsSubscribed(
+    FullSequence sequence) {
+  std::vector<SubscribeWindow*> retval;
+  for (auto it = windows.begin(); it != windows.end(); it++) {
+    if (it->InWindow(sequence)) {
+      retval.push_back(&(*it));
+    }
+  }
+  return retval;
+}
+
+}  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h
index 4bf5f76..f8def02 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows.h
+++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -8,51 +8,72 @@
 #include <cstdint>
 #include <list>
 #include <optional>
+#include <vector>
 
+#include "absl/container/flat_hash_map.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
-struct SubscribeWindow {
-  FullSequence start;
-  std::optional<FullSequence> end;
+// Classes to track subscriptions to local tracks: the sequence numbers
+// subscribed, the streams involved, and the subscribe IDs.
+class QUICHE_EXPORT SubscribeWindow {
+ public:
   // Creates a half-open window.
-  SubscribeWindow(uint64_t start_group, uint64_t start_object) {
-    start = {start_group, start_object};
-    end = std::nullopt;
-  }
+  SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
+                  uint64_t start_object)
+      : subscribe_id_(subscribe_id), start_({start_group, start_object}) {}
+
   // Creates a closed window.
-  SubscribeWindow(uint64_t start_group, uint64_t start_object,
-                  uint64_t end_group, uint64_t end_object) {
-    start = {start_group, start_object};
-    end = {end_group, end_object};
-  }
-  bool InWindow(const FullSequence& seq) const {
-    if (seq < start) {
-      return false;
-    }
-    if (!end.has_value() || seq < *end) {
-      return true;
-    }
-    return false;
-  }
+  SubscribeWindow(uint64_t subscribe_id, uint64_t start_group,
+                  uint64_t start_object, uint64_t end_group,
+                  uint64_t end_object)
+      : subscribe_id_(subscribe_id),
+        start_({start_group, start_object}),
+        end_(FullSequence(end_group, end_object)) {}
+
+  uint64_t subscribe_id() const { return subscribe_id_; }
+
+  bool InWindow(const FullSequence& seq) const;
+
+  // Returns the stream to send |sequence| on, if already opened.
+  std::optional<webtransport::StreamId> GetStreamForSequence(
+      FullSequence sequence,
+      MoqtForwardingPreference forwarding_preference) const;
+
+  // Records what stream is being used for a track, group, or object depending
+  // on |forwarding_preference|. Triggers QUIC_BUG if already assigned.
+  void AddStream(MoqtForwardingPreference forwarding_preference,
+                 uint64_t group_id, uint64_t object_id,
+                 webtransport::StreamId stream_id);
+
+  void RemoveStream(MoqtForwardingPreference forwarding_preference,
+                    uint64_t group_id, uint64_t object_id);
+
+ private:
+  struct GroupStreams {
+    std::optional<webtransport::StreamId> group_stream;
+    absl::flat_hash_map<uint64_t, webtransport::StreamId> object_streams;
+  };
+  const uint64_t subscribe_id_;
+  const FullSequence start_;
+  const std::optional<FullSequence> end_ = std::nullopt;
+  // Open streams for this subscription
+  std::optional<webtransport::StreamId> track_stream_;
+  absl::flat_hash_map<uint64_t, GroupStreams> group_streams_;
 };
 
 // Class to keep track of the sequence number blocks to which a peer is
 // subscribed.
-class MoqtSubscribeWindows {
+class QUICHE_EXPORT MoqtSubscribeWindows {
  public:
   MoqtSubscribeWindows() {}
 
-  bool SequenceIsSubscribed(uint64_t group, uint64_t object) const {
-    FullSequence seq(group, object);
-    for (auto it : windows) {
-      if (it.InWindow(seq)) {
-        return true;
-      }
-    }
-    return false;
-  }
+  // Returns a vector of subscribe IDs that apply to the object. They will be in
+  // reverse order of the AddWindow calls.
+  std::vector<SubscribeWindow*> SequenceIsSubscribed(FullSequence sequence);
 
   // |window| has already been converted into absolute sequence numbers. An
   // optimization could consolidate overlapping subscribe windows.
diff --git a/quiche/quic/moqt/moqt_subscribe_windows_test.cc b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
index b1b1bd0..02e7007 100644
--- a/quiche/quic/moqt/moqt_subscribe_windows_test.cc
+++ b/quiche/quic/moqt/moqt_subscribe_windows_test.cc
@@ -4,35 +4,166 @@
 
 #include "quiche/quic/moqt/moqt_subscribe_windows.h"
 
+#include <optional>
+
+#include "quiche/quic/moqt/moqt_messages.h"
 #include "quiche/quic/platform/api/quic_test.h"
+#include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/web_transport/web_transport.h"
 
 namespace moqt {
 
 namespace test {
 
-class MoqtSubscribeWindowsTest : public quic::test::QuicTest {
+class QUICHE_EXPORT SubscribeWindowTest : public quic::test::QuicTest {
+ public:
+  SubscribeWindowTest()
+      : window_(/*subscribe_id=*/2, /*start_group=*/4,
+                /*start_object=*/0, /*end_group=*/5,
+                /*end_object=*/1) {}
+
+  SubscribeWindow window_;
+};
+
+TEST_F(SubscribeWindowTest, Queries) {
+  EXPECT_EQ(window_.subscribe_id(), 2);
+  EXPECT_TRUE(window_.InWindow(FullSequence(4, 0)));
+  EXPECT_TRUE(window_.InWindow(FullSequence(5, 1)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(5, 2)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(6, 0)));
+  EXPECT_FALSE(window_.InWindow(FullSequence(3, 12)));
+}
+
+TEST_F(SubscribeWindowTest, AddRemoveStream) {
+  window_.AddStream(MoqtForwardingPreference::kTrack, 4, 0, 2);
+  window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
+  window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
+  window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
+  // This is a no-op; the stream does not exist.
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 6, 0);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kGroup),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            10);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kDatagram),
+            10);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kTrack),
+            2);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kObject),
+            14);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kDatagram),
+            14);
+
+  window_.RemoveStream(MoqtForwardingPreference::kTrack, 4, 0);
+  window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 1);
+  // kObject and kDatagram are interchangeable
+  window_.RemoveStream(MoqtForwardingPreference::kObject, 5, 2);
+  // The two commands above should not have deleted the group stream.
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kGroup),
+            6);
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
+
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(4, 0),
+                                         MoqtForwardingPreference::kTrack),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            std::nullopt);
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 2),
+                                         MoqtForwardingPreference::kDatagram),
+            std::nullopt);
+}
+
+TEST_F(SubscribeWindowTest, RemoveGroupBeforeObjects) {
+  window_.AddStream(MoqtForwardingPreference::kGroup, 5, 0, 6);
+  window_.AddStream(MoqtForwardingPreference::kObject, 5, 1, 10);
+  window_.AddStream(MoqtForwardingPreference::kDatagram, 5, 2, 14);
+  window_.RemoveStream(MoqtForwardingPreference::kGroup, 5, 0);
+  // Object stream is not deleted when the root group stream is.
+  EXPECT_EQ(window_.GetStreamForSequence(FullSequence(5, 1),
+                                         MoqtForwardingPreference::kObject),
+            10);
+  EXPECT_FALSE(window_
+                   .GetStreamForSequence(FullSequence(5, 0),
+                                         MoqtForwardingPreference::kGroup)
+                   .has_value());
+}
+
+class QUICHE_EXPORT MoqtSubscribeWindowsTest : public quic::test::QuicTest {
  public:
   MoqtSubscribeWindows windows_;
 };
 
 TEST_F(MoqtSubscribeWindowsTest, IsEmpty) {
   EXPECT_TRUE(windows_.IsEmpty());
-  windows_.AddWindow(SubscribeWindow(1, 3));
+  windows_.AddWindow(SubscribeWindow(0, 1, 3));
   EXPECT_FALSE(windows_.IsEmpty());
 }
 
 TEST_F(MoqtSubscribeWindowsTest, IsSubscribed) {
   EXPECT_TRUE(windows_.IsEmpty());
   // The first two windows overlap; the third is open-ended.
-  windows_.AddWindow(SubscribeWindow(1, 0, 3, 9));
-  windows_.AddWindow(SubscribeWindow(2, 4, 4, 3));
-  windows_.AddWindow(SubscribeWindow(10, 0));
+  windows_.AddWindow(SubscribeWindow(0, 1, 0, 3, 9));
+  windows_.AddWindow(SubscribeWindow(1, 2, 4, 4, 3));
+  windows_.AddWindow(SubscribeWindow(2, 10, 0));
   EXPECT_FALSE(windows_.IsEmpty());
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(0, 8));
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(1, 0));
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(4, 4));
-  EXPECT_FALSE(windows_.SequenceIsSubscribed(8, 3));
-  EXPECT_TRUE(windows_.SequenceIsSubscribed(100, 7));
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(0, 8)).empty());
+  auto hits = windows_.SequenceIsSubscribed(FullSequence(1, 0));
+  EXPECT_EQ(hits.size(), 1);
+  EXPECT_EQ(hits[0]->subscribe_id(), 0);
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(4, 4)).empty());
+  EXPECT_TRUE(windows_.SequenceIsSubscribed(FullSequence(8, 3)).empty());
+  hits = windows_.SequenceIsSubscribed(FullSequence(100, 7));
+  EXPECT_EQ(hits.size(), 1);
+  EXPECT_EQ(hits[0]->subscribe_id(), 2);
+  hits = windows_.SequenceIsSubscribed(FullSequence(3, 0));
+  EXPECT_EQ(hits.size(), 2);
+  EXPECT_EQ(hits[0]->subscribe_id(), 1);
+  EXPECT_EQ(hits[1]->subscribe_id(), 0);
 }
 
 }  // namespace test
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 4dad086..4d04afc 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -7,6 +7,7 @@
 
 #include <cstdint>
 #include <optional>
+#include <vector>
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
@@ -27,32 +28,31 @@
     // the session will send SUBSCRIBE_OK. If the return has a value, the value
     // is the error message (the session will send SUBSCRIBE_ERROR). Via this
     // API, the application decides if a partially fulfillable
-    // SUBSCRIBE_REQUEST results in an error or not.
-    virtual std::optional<absl::string_view> OnSubscribeRequestForPast(
+    // SUBSCRIBE results in an error or not.
+    virtual std::optional<absl::string_view> OnSubscribeForPast(
         const SubscribeWindow& window) = 0;
   };
   // |visitor| must not be nullptr.
-  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
-             Visitor* visitor)
-      : full_track_name_(full_track_name),
-        track_alias_(track_alias),
-        visitor_(visitor) {}
+  LocalTrack(const FullTrackName& full_track_name, Visitor* visitor)
+      : full_track_name_(full_track_name), visitor_(visitor) {}
   // Creates a LocalTrack that does not start at sequence (0,0)
-  LocalTrack(const FullTrackName& full_track_name, uint64_t track_alias,
-             Visitor* visitor, FullSequence next_sequence)
+  LocalTrack(const FullTrackName& full_track_name, Visitor* visitor,
+             FullSequence next_sequence)
       : full_track_name_(full_track_name),
-        track_alias_(track_alias),
         next_sequence_(next_sequence),
         visitor_(visitor) {}
 
   const FullTrackName& full_track_name() const { return full_track_name_; }
 
-  uint64_t track_alias() const { return track_alias_; }
+  std::optional<uint64_t> track_alias() const { return track_alias_; }
+  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
 
   Visitor* visitor() { return visitor_; }
 
-  bool ShouldSend(uint64_t group, uint64_t object) const {
-    return windows_.SequenceIsSubscribed(group, object);
+  // Returns the subscribe windows that want the object defined by (|group|,
+  // |object|).
+  std::vector<SubscribeWindow*> ShouldSend(FullSequence sequence) {
+    return windows_.SequenceIsSubscribed(sequence);
   }
 
   void AddWindow(SubscribeWindow window) { windows_.AddWindow(window); }
@@ -61,7 +61,12 @@
   // by one.
   const FullSequence& next_sequence() const { return next_sequence_; }
 
-  FullSequence& next_sequence_mutable() { return next_sequence_; }
+  // Updates next_sequence_ if |sequence| is larger.
+  void SentSequence(FullSequence sequence) {
+    if (next_sequence_ <= sequence) {
+      next_sequence_ = {sequence.group, sequence.object + 1};
+    }
+  }
 
   bool HasSubscriber() const { return !windows_.IsEmpty(); }
 
@@ -69,9 +74,12 @@
   // This only needs to track subscriptions to current and future objects;
   // requests for objects in the past are forwarded to the application.
   const FullTrackName full_track_name_;
-  const uint64_t track_alias_;
+  // Let the first SUBSCRIBE determine the track alias.
+  std::optional<uint64_t> track_alias_;
   // The sequence numbers from this track to which the peer is subscribed.
   MoqtSubscribeWindows windows_;
+  // By recording the highest observed sequence number, MoQT can interpret
+  // relative sequence numbers in SUBSCRIBEs.
   FullSequence next_sequence_ = {0, 0};
   Visitor* visitor_;
 };
@@ -82,34 +90,36 @@
   class Visitor {
    public:
     virtual ~Visitor() = default;
-    // Called when the session receives a response to the SUBSCRIBE_REQUEST.
+    // Called when the session receives a response to the SUBSCRIBE, unless it's
+    // a SUBSCRIBE_ERROR with a new track_alias. In that case, the session will
+    // automatically retry.
     virtual void OnReply(
         const FullTrackName& full_track_name,
         std::optional<absl::string_view> error_reason_phrase) = 0;
-    virtual void OnObjectFragment(const FullTrackName& full_track_name,
-                                  uint32_t stream_id, uint64_t group_sequence,
-                                  uint64_t object_sequence,
-                                  uint64_t object_send_order,
-                                  absl::string_view object,
-                                  bool end_of_message) = 0;
+    virtual void OnObjectFragment(
+        const FullTrackName& full_track_name, uint64_t group_sequence,
+        uint64_t object_sequence, uint64_t object_send_order,
+        MoqtForwardingPreference forwarding_preference,
+        absl::string_view object, bool end_of_message) = 0;
     // TODO(martinduke): Add final sequence numbers
   };
-  RemoteTrack(const FullTrackName& full_track_name, Visitor* visitor)
-      : full_track_name_(full_track_name), visitor_(visitor) {}
+  RemoteTrack(const FullTrackName& full_track_name, uint64_t track_alias,
+              Visitor* visitor)
+      : full_track_name_(full_track_name),
+        track_alias_(track_alias),
+        visitor_(visitor) {}
 
   const FullTrackName& full_track_name() { return full_track_name_; }
 
-  std::optional<uint64_t> track_alias() const { return track_alias_; }
+  uint64_t track_alias() const { return track_alias_; }
 
   Visitor* visitor() { return visitor_; }
 
-  void set_track_alias(uint64_t track_alias) { track_alias_ = track_alias; }
-
  private:
   // TODO: There is no accounting for the number of outstanding subscribes,
   // because we can't match track names to individual subscribes.
-  FullTrackName full_track_name_;
-  std::optional<uint64_t> track_alias_;
+  const FullTrackName full_track_name_;
+  const uint64_t track_alias_;
   Visitor* visitor_;
 };
 
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index a4d3e2a..4d85b6b 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -4,7 +4,6 @@
 
 #include "quiche/quic/moqt/moqt_track.h"
 
-#include <cstdint>
 #include <optional>
 
 #include "absl/strings/string_view.h"
@@ -20,49 +19,53 @@
 class LocalTrackTest : public quic::test::QuicTest {
  public:
   LocalTrackTest()
-      : track_(FullTrackName("foo", "bar"), /*track_alias=*/5, &visitor_,
-               FullSequence(4, 1)) {}
+      : track_(FullTrackName("foo", "bar"), &visitor_, FullSequence(4, 1)) {}
   LocalTrack track_;
   MockLocalTrackVisitor visitor_;
 };
 
 TEST_F(LocalTrackTest, Queries) {
   EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), 5);
+  EXPECT_EQ(track_.track_alias(), std::nullopt);
   EXPECT_EQ(track_.visitor(), &visitor_);
   EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));
-  FullSequence& mutable_next = track_.next_sequence_mutable();
-  mutable_next.object++;
+  track_.SentSequence(FullSequence(4, 0));
+  EXPECT_EQ(track_.next_sequence(), FullSequence(4, 1));  // no change
+  track_.SentSequence(FullSequence(4, 1));
   EXPECT_EQ(track_.next_sequence(), FullSequence(4, 2));
   EXPECT_FALSE(track_.HasSubscriber());
 }
 
-TEST_F(LocalTrackTest, AfterSubscribe) {
-  track_.AddWindow(SubscribeWindow(4, 1));
+TEST_F(LocalTrackTest, SetTrackAlias) {
+  EXPECT_EQ(track_.track_alias(), std::nullopt);
+  track_.set_track_alias(6);
+  EXPECT_EQ(track_.track_alias(), 6);
+}
+
+TEST_F(LocalTrackTest, ShouldSend) {
+  track_.AddWindow(SubscribeWindow(0, 4, 1));
   EXPECT_TRUE(track_.HasSubscriber());
-  EXPECT_FALSE(track_.ShouldSend(3, 12));
-  EXPECT_FALSE(track_.ShouldSend(4, 0));
-  EXPECT_TRUE(track_.ShouldSend(4, 1));
-  EXPECT_TRUE(track_.ShouldSend(12, 0));
+  EXPECT_TRUE(track_.ShouldSend(FullSequence(3, 12)).empty());
+  EXPECT_TRUE(track_.ShouldSend(FullSequence(4, 0)).empty());
+  EXPECT_EQ(track_.ShouldSend(FullSequence(4, 1)).size(), 1);
+  EXPECT_EQ(track_.ShouldSend(FullSequence(12, 0)).size(), 1);
 }
 
 class RemoteTrackTest : public quic::test::QuicTest {
  public:
-  RemoteTrackTest() : track_(FullTrackName("foo", "bar"), &visitor_) {}
+  RemoteTrackTest()
+      : track_(FullTrackName("foo", "bar"), /*track_alias=*/5, &visitor_) {}
   RemoteTrack track_;
   MockRemoteTrackVisitor visitor_;
 };
 
 TEST_F(RemoteTrackTest, Queries) {
   EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.track_alias(), std::nullopt);
+  EXPECT_EQ(track_.track_alias(), 5);
   EXPECT_EQ(track_.visitor(), &visitor_);
 }
 
-TEST_F(RemoteTrackTest, SetAlias) {
-  track_.set_track_alias(5);
-  EXPECT_EQ(track_.track_alias(), 5);
-}
+// TODO: Write test for GetStreamForSequence.
 
 }  // namespace test
 
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 09ebb64..dae15e0 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -8,6 +8,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <cstring>
+#include <memory>
 #include <optional>
 #include <vector>
 
@@ -34,10 +35,10 @@
   MoqtMessageType message_type() const { return message_type_; }
 
   typedef absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtObject,
-                        MoqtSubscribeRequest, MoqtSubscribeOk,
-                        MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeFin,
-                        MoqtSubscribeRst, MoqtAnnounce, MoqtAnnounceOk,
-                        MoqtAnnounceError, MoqtUnannounce, MoqtGoAway>
+                        MoqtSubscribe, MoqtSubscribeOk, MoqtSubscribeError,
+                        MoqtUnsubscribe, MoqtSubscribeFin, MoqtSubscribeRst,
+                        MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError,
+                        MoqtUnannounce, MoqtGoAway>
       MessageStructuredData;
 
   // The total actual size of the message.
@@ -112,19 +113,25 @@
 // Base class for the two subtypes of Object Message.
 class QUICHE_NO_EXPORT ObjectMessage : public TestMessageBase {
  public:
-  ObjectMessage(MoqtMessageType type) : TestMessageBase(type) {}
+  ObjectMessage(MoqtMessageType type) : TestMessageBase(type) {
+    object_.forwarding_preference = GetForwardingPreference(type);
+  }
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtObject>(values);
-    if (cast.track_id != object_.track_id) {
+    if (cast.subscribe_id != object_.subscribe_id) {
       QUIC_LOG(INFO) << "OBJECT Track ID mismatch";
       return false;
     }
-    if (cast.group_sequence != object_.group_sequence) {
+    if (cast.track_alias != object_.track_alias) {
+      QUIC_LOG(INFO) << "OBJECT Track ID mismatch";
+      return false;
+    }
+    if (cast.group_id != object_.group_id) {
       QUIC_LOG(INFO) << "OBJECT Group Sequence mismatch";
       return false;
     }
-    if (cast.object_sequence != object_.object_sequence) {
+    if (cast.object_id != object_.object_id) {
       QUIC_LOG(INFO) << "OBJECT Object Sequence mismatch";
       return false;
     }
@@ -132,88 +139,155 @@
       QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
       return false;
     }
+    if (cast.forwarding_preference != object_.forwarding_preference) {
+      QUIC_LOG(INFO) << "OBJECT Object Send Order mismatch";
+      return false;
+    }
+    if (cast.payload_length != object_.payload_length) {
+      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
+      return false;
+    }
     return true;
   }
 
-  void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
-  }
-
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(object_);
   }
 
  protected:
   MoqtObject object_ = {
-      /*track_id=*/4,
-      /*group_sequence=*/5,
-      /*object_sequence=*/6,
+      /*subscribe_id=*/3,
+      /*track_alias=*/4,
+      /*group_id*/ 5,
+      /*object_id=*/6,
       /*object_send_order=*/7,
+      /*forwarding_preference=*/MoqtForwardingPreference::kTrack,
       /*payload_length=*/std::nullopt,
   };
 };
 
-class QUICHE_NO_EXPORT ObjectMessageWithLength : public ObjectMessage {
+class QUICHE_NO_EXPORT ObjectStreamMessage : public ObjectMessage {
  public:
-  ObjectMessageWithLength()
-      : ObjectMessage(MoqtMessageType::kObjectWithPayloadLength) {
+  ObjectStreamMessage() : ObjectMessage(MoqtMessageType::kObjectStream) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-    object_.payload_length = payload_length_;
-  }
-
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtObject>(values);
-    if (cast.payload_length != payload_length_) {
-      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
-      return false;
-    }
-    return ObjectMessage::EqualFieldValues(values);
+    object_.forwarding_preference = MoqtForwardingPreference::kObject;
   }
 
   void ExpandVarints() override {
     ExpandVarintsImpl("vvvvvv");  // first six fields are varints
   }
 
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(object_);
+ private:
+  uint8_t raw_packet_[9] = {
+      0x00, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
+      0x66, 0x6f, 0x6f,                    // payload = "foo"
+  };
+};
+
+class QUICHE_NO_EXPORT ObjectPreferDatagramMessage : public ObjectMessage {
+ public:
+  ObjectPreferDatagramMessage()
+      : ObjectMessage(MoqtMessageType::kObjectPreferDatagram) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kDatagram;
+  }
+
+  void ExpandVarints() override {
+    ExpandVarintsImpl("vvvvvv");  // first six fields are varints
   }
 
  private:
   uint8_t raw_packet_[9] = {
-      0x00, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x03, 0x66, 0x6f, 0x6f,        // payload = "foo"
+      0x01, 0x03, 0x04, 0x05, 0x06, 0x07,  // varints
+      0x66, 0x6f, 0x6f,                    // payload = "foo"
   };
-  std::optional<uint64_t> payload_length_ = 3;
 };
 
-class QUICHE_NO_EXPORT ObjectMessageWithoutLength : public ObjectMessage {
+// Concatentation of the base header and the object-specific header. Follow-on
+// object headers are handled in a different class.
+class QUICHE_NO_EXPORT StreamHeaderTrackMessage : public ObjectMessage {
  public:
-  ObjectMessageWithoutLength()
-      : ObjectMessage(MoqtMessageType::kObjectWithoutPayloadLength) {
+  StreamHeaderTrackMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderTrack) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
-  }
-
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtObject>(values);
-    if (cast.payload_length != std::nullopt) {
-      QUIC_LOG(INFO) << "OBJECT Payload Length mismatch";
-      return false;
-    }
-    return ObjectMessage::EqualFieldValues(values);
+    object_.forwarding_preference = MoqtForwardingPreference::kTrack;
+    object_.payload_length = 3;
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvvv");  // first six fields are varints
-  }
-
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(object_);
+    ExpandVarintsImpl("--vvvvvv");  // six one-byte varints
   }
 
  private:
-  uint8_t raw_packet_[8] = {
-      0x02, 0x04, 0x05, 0x06, 0x07,  // varints
-      0x66, 0x6f, 0x6f,              // payload = "foo"
+  // Some tests check that a FIN sent at the halfway point of a message results
+  // in an error. Without the unnecessary expanded varint 0x0405, the halfway
+  // point falls at the end of the Stream Header, which is legal. Expand the
+  // varint so that the FIN would be illegal.
+  uint8_t raw_packet_[11] = {
+      0x40, 0x50,              // two byte type field
+      0x03, 0x04, 0x07,        // varints
+      0x05, 0x06,              // object middler
+      0x03, 0x66, 0x6f, 0x6f,  // payload = "foo"
+  };
+};
+
+// Used only for tests that process multiple objects on one stream.
+class QUICHE_NO_EXPORT StreamMiddlerTrackMessage : public ObjectMessage {
+ public:
+  StreamMiddlerTrackMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderTrack) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kTrack;
+    object_.payload_length = 3;
+    object_.group_id = 9;
+    object_.object_id = 10;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ private:
+  uint8_t raw_packet_[6] = {
+      0x09, 0x0a, 0x03, 0x62, 0x61, 0x72,  // object middler; payload = "bar"
+  };
+};
+
+class QUICHE_NO_EXPORT StreamHeaderGroupMessage : public ObjectMessage {
+ public:
+  StreamHeaderGroupMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderGroup) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
+    object_.payload_length = 3;
+  }
+
+  void ExpandVarints() override {
+    ExpandVarintsImpl("--vvvvvv");  // six one-byte varints
+  }
+
+ private:
+  uint8_t raw_packet_[11] = {
+      0x40, 0x51,                    // two-byte type field
+      0x03, 0x04, 0x05, 0x07,        // varints
+      0x06, 0x03, 0x66, 0x6f, 0x6f,  // object middler; payload = "foo"
+  };
+};
+
+// Used only for tests that process multiple objects on one stream.
+class QUICHE_NO_EXPORT StreamMiddlerGroupMessage : public ObjectMessage {
+ public:
+  StreamMiddlerGroupMessage()
+      : ObjectMessage(MoqtMessageType::kStreamHeaderGroup) {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+    object_.forwarding_preference = MoqtForwardingPreference::kGroup;
+    object_.payload_length = 3;
+    object_.object_id = 9;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ private:
+  uint8_t raw_packet_[5] = {
+      0x09, 0x03, 0x62, 0x61, 0x72,  // object middler; payload = "bar"
   };
 };
 
@@ -325,72 +399,115 @@
   };
 };
 
-class QUICHE_NO_EXPORT SubscribeRequestMessage : public TestMessageBase {
+class QUICHE_NO_EXPORT SubscribeMessage : public TestMessageBase {
  public:
-  SubscribeRequestMessage()
-      : TestMessageBase(MoqtMessageType::kSubscribeRequest) {
+  SubscribeMessage() : TestMessageBase(MoqtMessageType::kSubscribe) {
     SetWireImage(raw_packet_, sizeof(raw_packet_));
   }
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtSubscribeRequest>(values);
+    auto cast = std::get<MoqtSubscribe>(values);
+    if (cast.subscribe_id != subscribe_request_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch";
+      return false;
+    }
+    if (cast.track_alias != subscribe_request_.track_alias) {
+      QUIC_LOG(INFO) << "SUBSCRIBE track alias mismatch";
+      return false;
+    }
     if (cast.track_namespace != subscribe_request_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST track namespace mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE track namespace mismatch";
       return false;
     }
     if (cast.track_name != subscribe_request_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST track name mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE track name mismatch";
       return false;
     }
     if (cast.start_group != subscribe_request_.start_group) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST start group mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE start group mismatch";
       return false;
     }
     if (cast.start_object != subscribe_request_.start_object) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST start object mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE start object mismatch";
       return false;
     }
     if (cast.end_group != subscribe_request_.end_group) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST end group mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE end group mismatch";
       return false;
     }
     if (cast.end_object != subscribe_request_.end_object) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST end object mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE end object mismatch";
       return false;
     }
+#ifdef MOQT_AUTH_INFO
     if (cast.authorization_info != subscribe_request_.authorization_info) {
-      QUIC_LOG(INFO) << "SUBSCRIBE REQUEST authorization info mismatch";
+      QUIC_LOG(INFO) << "SUBSCRIBE authorization info mismatch";
       return false;
     }
+#endif
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v----vvvvvvvvv---"); }
+  void ExpandVarints() override {
+#ifdef MOQT_AUTH_INFO
+    ExpandVarintsImpl("vvvv---v----vvvvvvvvv");
+#else
+    ExpandVarintsImpl("vvvv---v----vvvvvv");
+#endif
+  }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_request_);
   }
 
  private:
-  uint8_t raw_packet_[22] = {
-      0x03, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x02, 0x04,                    // start_group = 4 (relative previous)
-      0x01, 0x01,                    // start_object = 1 (absolute)
-      0x00,                          // end_group = none
-      0x00,                          // end_object = none
-      0x01,                          // 1 parameter
-      0x02, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+#ifdef MOQT_AUTH_INFO
+  uint8_t raw_packet_[24] = {
+#else
+  uint8_t raw_packet_[18] = {
+#endif
+    0x03,
+    0x01,
+    0x02,  // id and alias
+    0x03,
+    0x66,
+    0x6f,
+    0x6f,  // track_namespace = "foo"
+    0x04,
+    0x61,
+    0x62,
+    0x63,
+    0x64,  // track_name = "abcd"
+    0x02,
+    0x04,  // start_group = 4 (relative previous)
+    0x01,
+    0x01,  // start_object = 1 (absolute)
+    0x00,  // end_group = none
+    0x00,  // end_object = none
+           // TODO(martinduke): figure out what to do about the missing num
+           // parameters field.
+#ifdef MOQT_AUTH_INFO
+    0x01,  // 1 parameter
+    0x02,
+    0x03,
+    0x62,
+    0x61,
+    0x72,  // authorization_info = "bar"
+#endif
   };
 
-  MoqtSubscribeRequest subscribe_request_ = {
+  MoqtSubscribe subscribe_request_ = {
+      /*subscribe_id=*/1,
+      /*track_alias=*/2,
       /*track_namespace=*/"foo",
       /*track_name=*/"abcd",
       /*start_group=*/MoqtSubscribeLocation(false, (int64_t)(-4)),
       /*start_object=*/MoqtSubscribeLocation(true, (uint64_t)1),
       /*end_group=*/std::nullopt,
       /*end_object=*/std::nullopt,
+#ifdef MOQT_AUTH_INFO
       /*authorization_info=*/"bar",
+#endif
   };
 };
 
@@ -402,16 +519,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeOk>(values);
-    if (cast.track_namespace != subscribe_ok_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track namespace mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_ok_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track name mismatch";
-      return false;
-    }
-    if (cast.track_id != subscribe_ok_.track_id) {
-      QUIC_LOG(INFO) << "SUBSCRIBE OK track ID mismatch";
+    if (cast.subscribe_id != subscribe_ok_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE OK subscribe ID mismatch";
       return false;
     }
     if (cast.expires != subscribe_ok_.expires) {
@@ -421,25 +530,20 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_ok_);
   }
 
  private:
-  uint8_t raw_packet_[11] = {
-      0x04, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x01,                          // track_id = 1
-      0x02,                          // expires = 2
+  uint8_t raw_packet_[3] = {
+      0x04, 0x01, 0x03,  // subscribe_id = 1, expires = 3
   };
 
   MoqtSubscribeOk subscribe_ok_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
-      /*track_id=*/1,
-      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(2),
+      /*subscribe_id=*/1,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
   };
 };
 
@@ -451,12 +555,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeError>(values);
-    if (cast.track_namespace != subscribe_error_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track namespace mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_error_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track name mismatch";
+    if (cast.subscribe_id != subscribe_error_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE ERROR subscribe_id mismatch";
       return false;
     }
     if (cast.error_code != subscribe_error_.error_code) {
@@ -467,28 +567,32 @@
       QUIC_LOG(INFO) << "SUBSCRIBE ERROR reason phrase mismatch";
       return false;
     }
+    if (cast.track_alias != subscribe_error_.track_alias) {
+      QUIC_LOG(INFO) << "SUBSCRIBE ERROR track alias mismatch";
+      return false;
+    }
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv---"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv---v"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_error_);
   }
 
  private:
-  uint8_t raw_packet_[14] = {
-      0x05, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x01,                          // error_code = 1
-      0x03, 0x62, 0x61, 0x72,        // reason_phrase = "bar"
+  uint8_t raw_packet_[8] = {
+      0x05, 0x02,              // subscribe_id = 2
+      0x01,                    // error_code = 2
+      0x03, 0x62, 0x61, 0x72,  // reason_phrase = "bar"
+      0x04,                    // track_alias = 4
   };
 
   MoqtSubscribeError subscribe_error_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/2,
       /*subscribe=*/SubscribeErrorCode::kInvalidRange,
       /*reason_phrase=*/"bar",
+      /*track_alias=*/4,
   };
 };
 
@@ -500,32 +604,26 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtUnsubscribe>(values);
-    if (cast.track_namespace != unsubscribe_.track_namespace) {
-      QUIC_LOG(INFO) << "UNSUBSCRIBE track name mismatch";
-      return false;
-    }
-    if (cast.track_name != unsubscribe_.track_name) {
-      QUIC_LOG(INFO) << "UNSUBSCRIBE track name mismatch";
+    if (cast.subscribe_id != unsubscribe_.subscribe_id) {
+      QUIC_LOG(INFO) << "UNSUBSCRIBE subscribe ID mismatch";
       return false;
     }
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(unsubscribe_);
   }
 
  private:
-  uint8_t raw_packet_[9] = {
-      0x0a, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
+  uint8_t raw_packet_[2] = {
+      0x0a, 0x03,  // subscribe_id = 3
   };
 
   MoqtUnsubscribe unsubscribe_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/3,
   };
 };
 
@@ -537,12 +635,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeFin>(values);
-    if (cast.track_namespace != subscribe_fin_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_FIN track name mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_fin_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_FIN track name mismatch";
+    if (cast.subscribe_id != subscribe_fin_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_FIN subscribe ID mismatch";
       return false;
     }
     if (cast.final_group != subscribe_fin_.final_group) {
@@ -556,23 +650,21 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_fin_);
   }
 
  private:
-  uint8_t raw_packet_[11] = {
-      0x0b, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x08,                          // final_group = 8
-      0x0c,                          // final_object = 12
+  uint8_t raw_packet_[4] = {
+      0x0b, 0x03,  // subscribe_id = 3
+      0x08,        // final_group = 8
+      0x0c,        // final_object = 12
   };
 
   MoqtSubscribeFin subscribe_fin_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/3,
       /*final_group=*/8,
       /*final_object=*/12,
   };
@@ -586,12 +678,8 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeRst>(values);
-    if (cast.track_namespace != subscribe_rst_.track_namespace) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_RST track name mismatch";
-      return false;
-    }
-    if (cast.track_name != subscribe_rst_.track_name) {
-      QUIC_LOG(INFO) << "SUBSCRIBE_RST track name mismatch";
+    if (cast.subscribe_id != subscribe_rst_.subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_RST subscribe ID mismatch";
       return false;
     }
     if (cast.error_code != subscribe_rst_.error_code) {
@@ -613,25 +701,23 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vv---v---vv--vv"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv--vv"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_rst_);
   }
 
  private:
-  uint8_t raw_packet_[15] = {
-      0x0c, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
-      0x03, 0x62, 0x61, 0x72,        // track_namespace = "bar"
-      0x03,                          // error_code = 3
-      0x02, 0x68, 0x69,              // reason_phrase = "hi"
-      0x08,                          // final_group = 8
-      0x0c,                          // final_object = 12
+  uint8_t raw_packet_[8] = {
+      0x0c, 0x02,        // subscribe_id = 2
+      0x03,              // error_code = 3
+      0x02, 0x68, 0x69,  // reason_phrase = "hi"
+      0x08,              // final_group = 8
+      0x0c,              // final_object = 12
   };
 
   MoqtSubscribeRst subscribe_rst_ = {
-      /*track_namespace=*/"foo",
-      /*track_name=*/"bar",
+      /*subscribe_id=*/2,
       /*error_code=*/3,
       /*reason_phrase=*/"hi",
       /*final_group=*/8,
@@ -813,6 +899,49 @@
   };
 };
 
+// Factory function for test messages.
+static inline std::unique_ptr<TestMessageBase> CreateTestMessage(
+    MoqtMessageType message_type, bool is_webtrans) {
+  switch (message_type) {
+    case MoqtMessageType::kObjectStream:
+      return std::make_unique<ObjectStreamMessage>();
+    case MoqtMessageType::kObjectPreferDatagram:
+      return std::make_unique<ObjectPreferDatagramMessage>();
+    case MoqtMessageType::kSubscribe:
+      return std::make_unique<SubscribeMessage>();
+    case MoqtMessageType::kSubscribeOk:
+      return std::make_unique<SubscribeOkMessage>();
+    case MoqtMessageType::kSubscribeError:
+      return std::make_unique<SubscribeErrorMessage>();
+    case MoqtMessageType::kUnsubscribe:
+      return std::make_unique<UnsubscribeMessage>();
+    case MoqtMessageType::kSubscribeFin:
+      return std::make_unique<SubscribeFinMessage>();
+    case MoqtMessageType::kSubscribeRst:
+      return std::make_unique<SubscribeRstMessage>();
+    case MoqtMessageType::kAnnounce:
+      return std::make_unique<AnnounceMessage>();
+    case MoqtMessageType::kAnnounceOk:
+      return std::make_unique<AnnounceOkMessage>();
+    case MoqtMessageType::kAnnounceError:
+      return std::make_unique<AnnounceErrorMessage>();
+    case MoqtMessageType::kUnannounce:
+      return std::make_unique<UnannounceMessage>();
+    case MoqtMessageType::kGoAway:
+      return std::make_unique<GoAwayMessage>();
+    case MoqtMessageType::kClientSetup:
+      return std::make_unique<ClientSetupMessage>(is_webtrans);
+    case MoqtMessageType::kServerSetup:
+      return std::make_unique<ServerSetupMessage>();
+    case MoqtMessageType::kStreamHeaderTrack:
+      return std::make_unique<StreamHeaderTrackMessage>();
+    case MoqtMessageType::kStreamHeaderGroup:
+      return std::make_unique<StreamHeaderGroupMessage>();
+    default:
+      return nullptr;
+  }
+}
+
 }  // namespace moqt::test
 
 #endif  // QUICHE_QUIC_MOQT_TEST_TOOLS_MOQT_TEST_MESSAGE_H_
diff --git a/quiche/quic/moqt/tools/chat_client_bin.cc b/quiche/quic/moqt/tools/chat_client_bin.cc
index f2af2f7..c5cb070 100644
--- a/quiche/quic/moqt/tools/chat_client_bin.cc
+++ b/quiche/quic/moqt/tools/chat_client_bin.cc
@@ -107,12 +107,11 @@
       }
     }
 
-    void OnObjectFragment(const moqt::FullTrackName& full_track_name,
-                          uint32_t /*stream_id*/, uint64_t group_sequence,
-                          uint64_t object_sequence,
-                          uint64_t /*object_send_order*/,
-                          absl::string_view object,
-                          bool end_of_message) override {
+    void OnObjectFragment(
+        const moqt::FullTrackName& full_track_name, uint64_t group_sequence,
+        uint64_t object_sequence, uint64_t /*object_send_order*/,
+        moqt::MoqtForwardingPreference /*forwarding_preference*/,
+        absl::string_view object, bool end_of_message) override {
       if (!end_of_message) {
         std::cerr << "Error: received partial message despite requesting "
                      "buffering\n";
diff --git a/quiche/quic/moqt/tools/moqt_client.cc b/quiche/quic/moqt/tools/moqt_client.cc
index 07c088e..754b68d 100644
--- a/quiche/quic/moqt/tools/moqt_client.cc
+++ b/quiche/quic/moqt/tools/moqt_client.cc
@@ -89,7 +89,7 @@
   }
 
   MoqtSessionParameters parameters;
-  parameters.version = MoqtVersion::kDraft01;
+  parameters.version = MoqtVersion::kDraft02;
   parameters.perspective = quic::Perspective::IS_CLIENT,
   parameters.using_webtrans = true;
   parameters.path = "";
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 9248114..246e841 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -31,7 +31,7 @@
 
 class MockLocalTrackVisitor : public LocalTrack::Visitor {
  public:
-  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeRequestForPast,
+  MOCK_METHOD(std::optional<absl::string_view>, OnSubscribeForPast,
               (const SubscribeWindow& window), (override));
 };
 
@@ -42,10 +42,10 @@
                std::optional<absl::string_view> error_reason_phrase),
               (override));
   MOCK_METHOD(void, OnObjectFragment,
-              (const FullTrackName& full_track_name, uint32_t stream_id,
-               uint64_t group_sequence, uint64_t object_sequence,
-               uint64_t object_send_order, absl::string_view object,
-               bool end_of_message),
+              (const FullTrackName& full_track_name, uint64_t group_sequence,
+               uint64_t object_sequence, uint64_t object_send_order,
+               MoqtForwardingPreference forwarding_preference,
+               absl::string_view object, bool end_of_message),
               (override));
 };
 
diff --git a/quiche/quic/moqt/tools/moqt_server.cc b/quiche/quic/moqt/tools/moqt_server.cc
index e811c6d..1c0f51b 100644
--- a/quiche/quic/moqt/tools/moqt_server.cc
+++ b/quiche/quic/moqt/tools/moqt_server.cc
@@ -33,7 +33,7 @@
     parameters.perspective = quic::Perspective::IS_SERVER;
     parameters.path = path;
     parameters.using_webtrans = true;
-    parameters.version = MoqtVersion::kDraft01;
+    parameters.version = MoqtVersion::kDraft02;
     parameters.deliver_partial_objects = false;
     return std::make_unique<MoqtSession>(session, parameters,
                                          *std::move(callbacks));