Make request_id even/odd for client/server.

Implement new MoQT SUBSCRIBE format.

PiperOrigin-RevId: 756496625
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index d2f57fc..21588c9 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -193,6 +193,8 @@
   return WireUint8(0xff);
 }
 
+WireUint8 WireBoolean(bool value) { return WireUint8(value ? 0x01 : 0x00); }
+
 uint64_t SignedVarintSerializedForm(int64_t value) {
   if (value < 0) {
     return ((-value) << 1) | 0x01;
@@ -416,11 +418,6 @@
 
 quiche::QuicheBuffer MoqtFramer::SerializeSubscribe(
     const MoqtSubscribe& message) {
-  MoqtFilterType filter_type = GetFilterType(message);
-  if (filter_type == MoqtFilterType::kNone) {
-    QUICHE_BUG(MoqtFramer_invalid_subscribe) << "Invalid object range";
-    return quiche::QuicheBuffer();
-  }
   KeyValuePairList parameters;
   VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
   if (!ValidateVersionSpecificParameters(parameters,
@@ -429,33 +426,44 @@
         << "Serializing invalid MoQT parameters";
     return quiche::QuicheBuffer();
   }
-  switch (filter_type) {
+  switch (message.filter_type) {
+    case MoqtFilterType::kNextGroupStart:
     case MoqtFilterType::kLatestObject:
       return SerializeControlMessage(
-          MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+          MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
           WireVarInt62(message.track_alias),
           WireFullTrackName(message.full_track_name, true),
           WireUint8(message.subscriber_priority),
-          WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
-          WireKeyValuePairList(parameters));
+          WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+          WireVarInt62(message.filter_type), WireKeyValuePairList(parameters));
     case MoqtFilterType::kAbsoluteStart:
+      if (!message.start.has_value()) {
+        return quiche::QuicheBuffer();
+      };
       return SerializeControlMessage(
-          MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+          MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
           WireVarInt62(message.track_alias),
           WireFullTrackName(message.full_track_name, true),
           WireUint8(message.subscriber_priority),
-          WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
-          WireVarInt62(message.start->group),
+          WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+          WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
           WireVarInt62(message.start->object),
           WireKeyValuePairList(parameters));
     case MoqtFilterType::kAbsoluteRange:
+      if (!message.start.has_value() || !message.end_group.has_value()) {
+        return quiche::QuicheBuffer();
+      }
+      if (*message.end_group < message.start->group) {
+        QUICHE_BUG(MoqtFramer_invalid_end_group) << "Invalid object range";
+        return quiche::QuicheBuffer();
+      }
       return SerializeControlMessage(
-          MoqtMessageType::kSubscribe, WireVarInt62(message.subscribe_id),
+          MoqtMessageType::kSubscribe, WireVarInt62(message.request_id),
           WireVarInt62(message.track_alias),
           WireFullTrackName(message.full_track_name, true),
           WireUint8(message.subscriber_priority),
-          WireDeliveryOrder(message.group_order), WireVarInt62(filter_type),
-          WireVarInt62(message.start->group),
+          WireDeliveryOrder(message.group_order), WireBoolean(message.forward),
+          WireVarInt62(message.filter_type), WireVarInt62(message.start->group),
           WireVarInt62(message.start->object), WireVarInt62(*message.end_group),
           WireKeyValuePairList(parameters));
     default:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index e772bc8..16939e3 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -402,35 +402,24 @@
 }
 
 TEST_F(MoqtFramerSimpleTest, AllSubscribeInputs) {
-  for (std::optional<Location> start :
-       {std::optional<Location>(),
-        std::optional<Location>(std::in_place, 4, 0)}) {
-    for (std::optional<uint64_t> end_group :
-         {std::optional<uint64_t>(), std::optional<uint64_t>(7)}) {
-      MoqtSubscribe subscribe = {
-          /*subscribe_id=*/3,
-          /*track_alias=*/4,
-          /*full_track_name=*/FullTrackName({"foo", "abcd"}),
-          /*subscriber_priority=*/0x20,
-          /*group_order=*/std::nullopt,
-          start,
-          end_group,
-          VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
-      };
-      quiche::QuicheBuffer buffer;
-      MoqtFilterType expected_filter_type = GetFilterType(subscribe);
-      if (expected_filter_type == MoqtFilterType::kNone) {
-        EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
-                        "Invalid object range");
-        EXPECT_EQ(buffer.size(), 0);
-        continue;
-      }
-      buffer = framer_.SerializeSubscribe(subscribe);
-      // Go to the filter type.
-      const uint8_t* read = BufferAtOffset(buffer, 17);
-      EXPECT_EQ(static_cast<MoqtFilterType>(*read), expected_filter_type);
-      EXPECT_GT(buffer.size(), 0);
-    }
+  for (auto filter :
+       {MoqtFilterType::kNextGroupStart, MoqtFilterType::kLatestObject,
+        MoqtFilterType::kAbsoluteStart, MoqtFilterType::kAbsoluteRange}) {
+    MoqtSubscribe subscribe = {
+        /*subscribe_id=*/3,
+        /*track_alias=*/4,
+        /*full_track_name=*/FullTrackName({"foo", "abcd"}),
+        /*subscriber_priority=*/0x20,
+        /*group_order=*/std::nullopt,
+        /*forward=*/true,
+        /*filter_type=*/filter,
+        /*start=*/std::make_optional<Location>(4, 1),
+        /*end_group=*/std::make_optional<uint64_t>(6ULL),
+        VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
+    };
+    quiche::QuicheBuffer buffer;
+    buffer = framer_.SerializeSubscribe(subscribe);
+    EXPECT_GT(buffer.size(), 0);
   }
 }
 
@@ -441,13 +430,15 @@
       /*full_track_name=*/FullTrackName({"foo", "abcd"}),
       /*subscriber_priority=*/0x20,
       /*group_order=*/std::nullopt,
-      /*start=*/Location(4, 3),
-      /*end_group=*/3,
+      /*forward=*/true,
+      /*filter_type=*/MoqtFilterType::kAbsoluteRange,
+      /*start=*/std::make_optional<Location>(4, 3),
+      /*end_group=*/std::make_optional<uint64_t>(3ULL),
       VersionSpecificParameters(AuthTokenType::kOutOfBand, "bar"),
   };
   quiche::QuicheBuffer buffer;
-  EXPECT_QUIC_BUG(buffer = framer_.SerializeSubscribe(subscribe),
-                  "Invalid object range");
+  EXPECT_QUICHE_BUG(buffer = framer_.SerializeSubscribe(subscribe),
+                    "Invalid object range");
   EXPECT_EQ(buffer.size(), 0);
 }
 
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 904954c..7e6f480 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -111,22 +111,6 @@
   }
 }
 
-MoqtFilterType GetFilterType(const MoqtSubscribe& message) {
-  if (message.start.has_value()) {
-    if (message.end_group.has_value()) {
-      if (*message.end_group < message.start->group) {
-        return MoqtFilterType::kNone;
-      }
-      return MoqtFilterType::kAbsoluteRange;
-    }
-    return MoqtFilterType::kAbsoluteStart;
-  }
-  if (message.end_group.has_value()) {
-    return MoqtFilterType::kNone;  // End group without start is invalid.
-  }
-  return MoqtFilterType::kLatestObject;
-}
-
 MoqtError ValidateSetupParameters(const KeyValuePairList& parameters,
                                   bool webtrans,
                                   quic::Perspective perspective) {
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index ac22147..9ba07f8 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -518,33 +518,25 @@
 
 enum class QUICHE_EXPORT MoqtFilterType : uint64_t {
   kNone = 0x0,
+  kNextGroupStart = 0x1,
   kLatestObject = 0x2,
   kAbsoluteStart = 0x3,
   kAbsoluteRange = 0x4,
 };
 
 struct QUICHE_EXPORT MoqtSubscribe {
-  uint64_t subscribe_id;
+  uint64_t request_id;
   uint64_t track_alias;
   FullTrackName full_track_name;
   MoqtPriority subscriber_priority;
   std::optional<MoqtDeliveryOrder> group_order;
-
-  // The combinations of these that have values indicate the filter type.
-  // (none): KLatestObject
-  // start: kAbsoluteStart
-  // start, end_group: kAbsoluteRange (request whole last group)
-  // All other combinations are invalid.
+  bool forward;
+  MoqtFilterType filter_type;
   std::optional<Location> start;
   std::optional<uint64_t> end_group;
-  // If the mode is kNone, the these are std::nullopt.
   VersionSpecificParameters parameters;
 };
 
-// Deduce the filter type from the combination of group and object IDs. Returns
-// kNone if the state of the subscribe is invalid.
-MoqtFilterType GetFilterType(const MoqtSubscribe& message);
-
 struct QUICHE_EXPORT MoqtSubscribeOk {
   uint64_t subscribe_id;
   // The message uses ms, but expires is in us.
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 5dd2508..e9a093b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -402,23 +402,30 @@
 size_t MoqtControlParser::ProcessSubscribe(quic::QuicDataReader& reader) {
   MoqtSubscribe subscribe;
   uint64_t filter, group, object;
-  uint8_t group_order;
+  uint8_t group_order, forward;
   absl::string_view track_name;
-  if (!reader.ReadVarInt62(&subscribe.subscribe_id) ||
+  if (!reader.ReadVarInt62(&subscribe.request_id) ||
       !reader.ReadVarInt62(&subscribe.track_alias) ||
       !ReadTrackNamespace(reader, subscribe.full_track_name) ||
       !reader.ReadStringPieceVarInt62(&track_name) ||
       !reader.ReadUInt8(&subscribe.subscriber_priority) ||
-      !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&filter)) {
+      !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&forward) ||
+      !reader.ReadVarInt62(&filter)) {
     return 0;
   }
   subscribe.full_track_name.AddElement(track_name);
   if (!ParseDeliveryOrder(group_order, subscribe.group_order)) {
-    ParseError("Invalid group order value in SUBSCRIBE message");
+    ParseError("Invalid group order value in SUBSCRIBE");
     return 0;
   }
-  MoqtFilterType filter_type = static_cast<MoqtFilterType>(filter);
-  switch (filter_type) {
+  if (forward > 1) {
+    ParseError("Invalid forward value in SUBSCRIBE");
+    return 0;
+  }
+  subscribe.forward = (forward == 1);
+  subscribe.filter_type = static_cast<MoqtFilterType>(filter);
+  switch (subscribe.filter_type) {
+    case MoqtFilterType::kNextGroupStart:
     case MoqtFilterType::kLatestObject:
       break;
     case MoqtFilterType::kAbsoluteStart:
@@ -427,7 +434,7 @@
         return 0;
       }
       subscribe.start = Location(group, object);
-      if (filter_type == MoqtFilterType::kAbsoluteStart) {
+      if (subscribe.filter_type == MoqtFilterType::kAbsoluteStart) {
         break;
       }
       if (!reader.ReadVarInt62(&group)) {
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 7e4ba28..4779c89 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -648,8 +648,8 @@
       0x20, 0x00, 0x0d, 0x02, 0x01, 0x02,  // versions = 1, 2
       0x03,                                // 4 params
       0x01, 0x03, 0x66, 0x6f, 0x6f,        // path = "foo"
-      0x02, 0x32,                          // max_subscribe_id = 50
-      0x02, 0x32,                          // max_subscribe_id = 50
+      0x02, 0x32,                          // max_request_id = 50
+      0x02, 0x32,                          // max_request_id = 50
   };
   stream.Receive(absl::string_view(setup, sizeof(setup)), false);
   parser.ReadAndDispatchMessages();
@@ -663,10 +663,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kWebTrans, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x1a, 0x01, 0x02,
+      0x03, 0x00, 0x1b, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x02,                          // two params
       0x1f, 0x03, 0x62, 0x61, 0x72,  // 0x1f = "bar"
@@ -681,10 +681,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x16, 0x01, 0x02,
+      0x03, 0x00, 0x17, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x02,                          // two params
       0x02, 0x67, 0x10,              // delivery_timeout = 10000
@@ -701,10 +701,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x16, 0x01, 0x02,
+      0x03, 0x00, 0x17, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x02,                          // two params
       0x04, 0x67, 0x10,              // max_cache_duration = 10000
@@ -721,10 +721,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x14, 0x01, 0x02,
+      0x03, 0x00, 0x15, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x01,                          // one param
       0x01, 0x02, 0x00, 0x00,        // authorization_token = DELETE 0;
@@ -740,10 +740,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x18, 0x01, 0x02, 0x01, 0x03, 0x66, 0x6f,
-      0x6f,                          // track_namespace = "foo"
+      0x03, 0x00, 0x19, 0x01, 0x02, 0x01, 0x03, 0x66,
+      0x6f, 0x6f,                    // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x01,                          // one param
       0x01, 0x06, 0x01, 0x10, 0x00, 0x62, 0x61, 0x72,  // REGISTER 0x01
@@ -759,10 +759,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x14, 0x01, 0x02,
+      0x03, 0x00, 0x15, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x01,                          // one param
       0x01, 0x02, 0x02, 0x07,        // authorization_token = USE 7;
@@ -779,10 +779,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x14, 0x01, 0x02,
+      0x03, 0x00, 0x15, 0x01, 0x02,
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x02,                          // filter_type = kLatestObject
       0x01,                          // one param
       0x01, 0x02, 0x04, 0x07,        // authorization_token type 4
@@ -799,10 +799,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x16, 0x01, 0x02, 0x01,
+      0x03, 0x00, 0x17, 0x01, 0x02, 0x01,
       0x03, 0x66, 0x6f, 0x6f,             // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,       // track_name = "abcd"
-      0x20, 0x02,                         // priority = 0x20 descending
+      0x20, 0x02, 0x01,                   // priority, order, forward
       0x02,                               // filter_type = kLatestObject
       0x01,                               // one param
       0x01, 0x04, 0x03, 0x01, 0x00, 0x00  // authorization_token type 1
@@ -814,6 +814,81 @@
   EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kKeyValueFormattingError);
 }
 
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidGroupOrder) {
+  webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+  MoqtControlParser parser(kRawQuic, &stream, visitor_);
+  char subscribe[] = {
+      0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias
+      0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
+      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
+      0x20,                          // subscriber priority = 0x20
+      0x03,                          // group order = invalid
+      0x01,                          // forward = true
+      0x03,                          // Filter type: Absolute Start
+      0x04,                          // start_group = 4 (relative previous)
+      0x01,                          // start_object = 1 (absolute)
+      // No EndGroup or EndObject
+      0x02,                                      // 2 parameters
+      0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms
+      0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar"
+  };
+  stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+  parser.ReadAndDispatchMessages();
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidForward) {
+  webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+  MoqtControlParser parser(kRawQuic, &stream, visitor_);
+  char subscribe[] = {
+      0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias
+      0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
+      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
+      0x20,                          // subscriber priority = 0x20
+      0x02,                          // group order = descending
+      0x02,                          // forward = invalid
+      0x03,                          // Filter type: Absolute Start
+      0x04,                          // start_group = 4 (relative previous)
+      0x01,                          // start_object = 1 (absolute)
+      // No EndGroup or EndObject
+      0x02,                                      // 2 parameters
+      0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms
+      0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar"
+  };
+  stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+  parser.ReadAndDispatchMessages();
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid forward value in SUBSCRIBE");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
+TEST_F(MoqtMessageSpecificTest, SubscribeInvalidFilter) {
+  webtransport::test::InMemoryStream stream(/*stream_id=*/0);
+  MoqtControlParser parser(kRawQuic, &stream, visitor_);
+  char subscribe[] = {
+      0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias
+      0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
+      0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
+      0x20,                          // subscriber priority = 0x20
+      0x02,                          // group order = descending
+      0x01,                          // forward = true
+      0x05,                          // Filter type: Absolute Start
+      0x04,                          // start_group = 4 (relative previous)
+      0x01,                          // start_object = 1 (absolute)
+      // No EndGroup or EndObject
+      0x02,                                      // 2 parameters
+      0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms
+      0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar"
+  };
+  stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
+  parser.ReadAndDispatchMessages();
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid filter type");
+  EXPECT_EQ(visitor_.parsing_error_code_, MoqtError::kProtocolViolation);
+}
+
 TEST_F(MoqtMessageSpecificTest, SubscribeOkHasAuthorizationToken) {
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kWebTrans, &stream, visitor_);
@@ -978,13 +1053,13 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x15, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20, group order descending
+      0x20, 0x02, 0x01,              // priority = 0x20, group order, forward
       0x02,                          // filter_type = kLatestObject
       0x01,                          // 1 parameter
-      0x03, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+      0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar"
   };
   stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
   parser.ReadAndDispatchMessages();
@@ -1000,28 +1075,28 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x15, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x08,                    // priority = 0x20 ???
-      0x01,                          // filter_type = kLatestGroup
+      0x20, 0x08, 0x01,              // priority, invalid order, forward
+      0x01,                          // filter_type = kNextGroupStart
       0x01,                          // 1 parameter
-      0x03, 0x03, 0x62, 0x61, 0x72,  // authorization_info = "bar"
+      0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_tag = "bar"
   };
   stream.Receive(absl::string_view(subscribe, sizeof(subscribe)), false);
   parser.ReadAndDispatchMessages();
   EXPECT_EQ(visitor_.messages_received_, 0);
-  EXPECT_THAT(visitor_.parsing_error_, Optional(HasSubstr("group order")));
+  EXPECT_EQ(visitor_.parsing_error_, "Invalid group order value in SUBSCRIBE");
 }
 
 TEST_F(MoqtMessageSpecificTest, AbsoluteStart) {
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x17, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x03,                          // filter_type = kAbsoluteStart
       0x04,                          // start_group = 4
       0x01,                          // start_object = 1
@@ -1043,10 +1118,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x19, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x04,                          // filter_type = kAbsoluteRange
       0x04,                          // start_group = 4
       0x01,                          // start_object = 1
@@ -1069,10 +1144,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x18, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x19, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x04,                          // filter_type = kAbsoluteRange
       0x04,                          // start_group = 4
       0x01,                          // start_object = 1
@@ -1090,10 +1165,10 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe[] = {
-      0x03, 0x00, 0x13, 0x01, 0x02,  // id and alias
+      0x03, 0x00, 0x14, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
-      0x20, 0x02,                    // priority = 0x20 descending
+      0x20, 0x02, 0x01,              // priority, order, forward
       0x04,                          // filter_type = kAbsoluteRange
       0x04,                          // start_group = 4
       0x01,                          // start_object = 1
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 2d7ab24..5645885 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -71,9 +71,17 @@
   return status.ok() && DoesTrackStatusImplyHavingData(*status);
 }
 
-SubscribeWindow SubscribeMessageToWindow(const MoqtSubscribe& subscribe) {
-  return SubscribeWindow(subscribe.start.value_or(Location(0, 0)),
-                         subscribe.end_group);
+std::optional<SubscribeWindow> SubscribeMessageToWindow(
+    const MoqtSubscribe& subscribe) {
+  if (!subscribe.forward ||
+      subscribe.filter_type == MoqtFilterType::kLatestObject ||
+      subscribe.filter_type == MoqtFilterType::kNextGroupStart) {
+    return std::nullopt;
+  }
+  if (!subscribe.start.has_value()) {
+    return std::nullopt;
+  }
+  return SubscribeWindow(*subscribe.start, subscribe.end_group);
 }
 
 class DefaultPublisher : public MoqtPublisher {
@@ -111,6 +119,11 @@
       }
     });
   }
+  if (parameters_.perspective == Perspective::IS_SERVER) {
+    next_request_id_ = 1;
+  } else {
+    next_incoming_request_id_ = 1;
+  }
 }
 
 MoqtSession::ControlStream* MoqtSession::GetControlStream() {
@@ -327,6 +340,8 @@
   message.full_track_name = name;
   message.subscriber_priority = kDefaultSubscriberPriority;
   message.group_order = std::nullopt;
+  message.forward = true;
+  message.filter_type = MoqtFilterType::kAbsoluteStart;
   message.start = Location(start_group, start_object);
   message.end_group = std::nullopt;
   message.parameters = parameters;
@@ -346,6 +361,8 @@
   message.full_track_name = name;
   message.subscriber_priority = kDefaultSubscriberPriority;
   message.group_order = std::nullopt;
+  message.forward = true;
+  message.filter_type = MoqtFilterType::kAbsoluteRange;
   message.start = Location(start_group, start_object);
   message.end_group = end_group;
   message.parameters = parameters;
@@ -359,6 +376,8 @@
   message.full_track_name = name;
   message.subscriber_priority = kDefaultSubscriberPriority;
   message.group_order = std::nullopt;
+  message.forward = true;
+  message.filter_type = MoqtFilterType::kLatestObject;
   message.start = std::nullopt;
   message.end_group = std::nullopt;
   message.parameters = parameters;
@@ -396,7 +415,8 @@
   }
   MoqtFetch message;
   message.full_track_name = name;
-  message.fetch_id = next_request_id_++;
+  message.fetch_id = next_request_id_;
+  next_request_id_ += 2;
   message.start_object = start;
   message.end_group = end_group;
   message.end_object = end_object;
@@ -439,9 +459,9 @@
                                MoqtPriority priority,
                                std::optional<MoqtDeliveryOrder> delivery_order,
                                VersionSpecificParameters parameters) {
-  if ((next_request_id_ + 1) >= peer_max_request_id_) {
+  if ((next_request_id_ + 2) >= peer_max_request_id_) {
     QUIC_DLOG(INFO) << ENDPOINT << "Tried to send JOINING_FETCH with ID "
-                    << (next_request_id_ + 1)
+                    << (next_request_id_ + 2)
                     << " which is greater than the maximum ID "
                     << peer_max_request_id_;
     return false;
@@ -450,7 +470,8 @@
   subscribe.full_track_name = name;
   subscribe.subscriber_priority = priority;
   subscribe.group_order = delivery_order;
-  // Must be "Current Object" filter.
+  subscribe.forward = true;
+  subscribe.filter_type = MoqtFilterType::kLatestObject;
   subscribe.start = std::nullopt;
   subscribe.end_group = std::nullopt;
   subscribe.parameters = parameters;
@@ -458,10 +479,11 @@
     return false;
   }
   MoqtFetch fetch;
-  fetch.fetch_id = next_request_id_++;
+  fetch.fetch_id = next_request_id_;
+  next_request_id_ += 2;
   fetch.subscriber_priority = priority;
   fetch.group_order = delivery_order;
-  fetch.joining_fetch = {subscribe.subscribe_id, num_previous_groups};
+  fetch.joining_fetch = {subscribe.request_id, num_previous_groups};
   fetch.parameters = parameters;
   SendControlMessage(framer_.SerializeFetch(fetch));
   QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name;
@@ -614,7 +636,8 @@
     QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE after GOAWAY";
     return false;
   }
-  message.subscribe_id = next_request_id_++;
+  message.request_id = next_request_id_;
+  next_request_id_ += 2;
   if (provided_track_alias.has_value()) {
     message.track_alias = *provided_track_alias;
     next_remote_track_alias_ =
@@ -626,7 +649,7 @@
     // Since we do not expose subscribe IDs directly in the API, instead wrap
     // the session and subscribe ID in a callback.
     visitor->OnCanAckObjects(absl::bind_front(&MoqtSession::SendObjectAck, this,
-                                              message.subscribe_id));
+                                              message.request_id));
   } else {
     QUICHE_DLOG_IF(WARNING, message.parameters.oack_window_size.has_value())
         << "Attempting to set object_ack_window on a connection that does not "
@@ -639,7 +662,7 @@
   auto track = std::make_unique<SubscribeRemoteTrack>(message, visitor);
   subscribe_by_name_.emplace(message.full_track_name, track.get());
   subscribe_by_alias_.emplace(message.track_alias, track.get());
-  upstream_by_id_.emplace(message.subscribe_id, std::move(track));
+  upstream_by_id_.emplace(message.request_id, std::move(track));
   return true;
 }
 
@@ -776,7 +799,7 @@
 }
 
 void MoqtSession::GrantMoreRequests(uint64_t num_requests) {
-  local_max_request_id_ += num_requests;
+  local_max_request_id_ += (num_requests * 2);
   MoqtMaxRequestId message;
   message.max_request_id = local_max_request_id_;
   SendControlMessage(framer_.SerializeMaxRequestId(message));
@@ -788,13 +811,13 @@
     Error(MoqtError::kTooManyRequests, "Received request with too large ID");
     return false;
   }
-  if (request_id < next_incoming_request_id_) {
+  if (request_id != next_incoming_request_id_) {
     QUIC_DLOG(INFO) << ENDPOINT << "Request ID not monotonically increasing";
     Error(MoqtError::kInvalidRequestId,
           "Request ID not monotonically increasing");
     return false;
   }
-  next_incoming_request_id_ = request_id + 1;
+  next_incoming_request_id_ = request_id + 2;
   return true;
 }
 
@@ -902,14 +925,14 @@
 
 void MoqtSession::ControlStream::OnSubscribeMessage(
     const MoqtSubscribe& message) {
-  if (!session_->ValidateRequestId(message.subscribe_id)) {
+  if (!session_->ValidateRequestId(message.request_id)) {
     return;
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE for "
                   << message.full_track_name;
   if (session_->sent_goaway_) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received a SUBSCRIBE after GOAWAY";
-    SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kUnauthorized,
+    SendSubscribeError(message.request_id, SubscribeErrorCode::kUnauthorized,
                        "SUBSCRIBE after GOAWAY", message.track_alias);
     return;
   }
@@ -925,7 +948,7 @@
     QUIC_DLOG(INFO) << ENDPOINT << "SUBSCRIBE for " << track_name
                     << " rejected by the application: "
                     << track_publisher.status();
-    SendSubscribeError(message.subscribe_id, SubscribeErrorCode::kDoesNotExist,
+    SendSubscribeError(message.request_id, SubscribeErrorCode::kDoesNotExist,
                        track_publisher.status().message(), message.track_alias);
     return;
   }
@@ -945,7 +968,7 @@
   subscription->set_delivery_timeout(message.parameters.delivery_timeout);
   MoqtSession::PublishedSubscription* subscription_ptr = subscription.get();
   auto [it, success] = session_->published_subscriptions_.emplace(
-      message.subscribe_id, std::move(subscription));
+      message.request_id, std::move(subscription));
   if (!success) {
     QUICHE_NOTREACHED();  // ValidateRequestId() should have caught this.
   }
@@ -1692,11 +1715,12 @@
     MoqtSession* session, std::shared_ptr<MoqtTrackPublisher> track_publisher,
     const MoqtSubscribe& subscribe,
     MoqtPublishingMonitorInterface* monitoring_interface)
-    : filter_type_(GetFilterType(subscribe)),
-      subscription_id_(subscribe.subscribe_id),
-      session_(session),
+    : session_(session),
       track_publisher_(track_publisher),
+      request_id_(subscribe.request_id),
       track_alias_(subscribe.track_alias),
+      filter_type_(subscribe.filter_type),
+      forward_(subscribe.forward),
       window_(SubscribeMessageToWindow(subscribe)),
       subscriber_priority_(subscribe.subscriber_priority),
       subscriber_delivery_order_(subscribe.group_order),
@@ -1731,12 +1755,18 @@
 void MoqtSession::PublishedSubscription::Update(
     Location start, std::optional<uint64_t> end_group,
     MoqtPriority subscriber_priority) {
-  window_.TruncateStart(start);
-  if (end_group.has_value()) {
-    window_.TruncateEnd(*end_group);
-  }
   subscriber_priority_ = subscriber_priority;
+  if (!window_.has_value()) {
+    window_ = SubscribeWindow(start, end_group);
+    return;
+  }
+  window_->TruncateStart(start);
+  if (end_group.has_value()) {
+    window_->TruncateEnd(*end_group);
+  }
   // TODO: update priority of all data streams that are currently open.
+  // TODO: update delivery timeout.
+  // TODO: update forward and subscribe filter.
 
   // TODO: reset streams that are no longer in-window.
   // TODO: send SUBSCRIBE_DONE if required.
@@ -1755,7 +1785,7 @@
   webtransport::SendOrder old_send_order =
       FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
   subscriber_priority_ = priority;
-  session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+  session_->UpdateQueuedSendOrder(request_id_, old_send_order,
                                   FinalizeSendOrder(old_send_order));
 };
 
@@ -1764,21 +1794,26 @@
   ControlStream* stream = session_->GetControlStream();
   if (PublisherHasData(*track_publisher_)) {
     largest_id = track_publisher_->GetLargestSequence();
-    if (window_.end() < *largest_id) {
-      stream->SendSubscribeError(subscription_id_,
-                                 SubscribeErrorCode::kInvalidRange,
-                                 "SUBSCRIBE ends in past group", track_alias_);
-      session_->published_subscriptions_.erase(subscription_id_);
-      // No class access below this line!
-      return;
+    QUICHE_CHECK(largest_id.has_value());
+    if (forward_) {
+      switch (filter_type_) {
+        case MoqtFilterType::kLatestObject:
+          window_ = SubscribeWindow(largest_id->next());
+          break;
+        case MoqtFilterType::kNextGroupStart:
+          window_ = SubscribeWindow(Location(largest_id->group + 1, 0));
+          break;
+        default:
+          break;
+      }
     }
-    if (!window_.TruncateStart(largest_id->next())) {
-      QUICHE_NOTREACHED();
-    };
+  } else if (filter_type_ == MoqtFilterType::kLatestObject ||
+             filter_type_ == MoqtFilterType::kNextGroupStart) {
+    // No data yet. All objects will be in-window.
+    window_ = SubscribeWindow(Location(0, 0));
   }
-
   MoqtSubscribeOk subscribe_ok;
-  subscribe_ok.subscribe_id = subscription_id_;
+  subscribe_ok.subscribe_id = request_id_;
   subscribe_ok.group_order = track_publisher_->GetDeliveryOrder();
   subscribe_ok.largest_id = largest_id;
   // TODO(martinduke): Support sending DELIVERY_TIMEOUT parameter as the
@@ -1790,15 +1825,15 @@
 void MoqtSession::PublishedSubscription::OnSubscribeRejected(
     MoqtSubscribeErrorReason reason, std::optional<uint64_t> track_alias) {
   session_->GetControlStream()->SendSubscribeError(
-      subscription_id_, reason.error_code, reason.reason_phrase,
+      request_id_, reason.error_code, reason.reason_phrase,
       track_alias.value_or(track_alias_));
-  session_->published_subscriptions_.erase(subscription_id_);
+  session_->published_subscriptions_.erase(request_id_);
   // No class access below this line!
 }
 
 void MoqtSession::PublishedSubscription::OnNewObjectAvailable(
     Location sequence) {
-  if (!window_.InWindow(sequence)) {
+  if (!InWindow(sequence)) {
     return;
   }
   if (reset_subgroups_.contains(
@@ -1841,7 +1876,7 @@
   if (stream_id.has_value()) {
     raw_stream = session_->session_->GetStreamById(*stream_id);
   } else {
-    raw_stream = session_->OpenOrQueueDataStream(subscription_id_, sequence);
+    raw_stream = session_->OpenOrQueueDataStream(request_id_, sequence);
   }
   if (raw_stream == nullptr) {
     return;
@@ -1853,12 +1888,12 @@
 }
 
 void MoqtSession::PublishedSubscription::OnTrackPublisherGone() {
-  session_->SubscribeIsDone(subscription_id_, SubscribeDoneCode::kGoingAway,
+  session_->SubscribeIsDone(request_id_, SubscribeDoneCode::kGoingAway,
                             "Publisher is gone");
 }
 
 void MoqtSession::PublishedSubscription::OnNewFinAvailable(Location sequence) {
-  if (!window_.InWindow(sequence)) {
+  if (!InWindow(sequence)) {
     return;
   }
   if (reset_subgroups_.contains(
@@ -1884,7 +1919,7 @@
 
 void MoqtSession::PublishedSubscription::OnSubgroupAbandoned(
     Location sequence, webtransport::StreamErrorCode error_code) {
-  if (!window_.InWindow(sequence)) {
+  if (!InWindow(sequence)) {
     return;
   }
   if (reset_subgroups_.contains(
@@ -1907,6 +1942,11 @@
 }
 
 void MoqtSession::PublishedSubscription::OnGroupAbandoned(uint64_t group_id) {
+  if (!window_.has_value() || window_->end().group < group_id ||
+      window_->start().group > group_id) {
+    // The group is not in the window, ignore.
+    return;
+  }
   std::vector<webtransport::StreamId> streams =
       stream_map().GetStreamsForGroup(group_id);
   for (webtransport::StreamId stream_id : streams) {
@@ -1961,10 +2001,10 @@
   queued_outgoing_data_streams_.emplace(
       UpdateSendOrderForSubscriberPriority(send_order, 0), first_object);
   if (!start_send_order.has_value()) {
-    session_->UpdateQueuedSendOrder(subscription_id_, std::nullopt, send_order);
+    session_->UpdateQueuedSendOrder(request_id_, std::nullopt, send_order);
   } else if (*start_send_order < send_order) {
     session_->UpdateQueuedSendOrder(
-        subscription_id_, FinalizeSendOrder(*start_send_order), send_order);
+        request_id_, FinalizeSendOrder(*start_send_order), send_order);
   }
 }
 
@@ -1980,13 +2020,12 @@
   // then taking base().
   queued_outgoing_data_streams_.erase((++it).base());
   if (queued_outgoing_data_streams_.empty()) {
-    session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
-                                    std::nullopt);
+    session_->UpdateQueuedSendOrder(request_id_, old_send_order, std::nullopt);
   } else {
     webtransport::SendOrder new_send_order =
         FinalizeSendOrder(queued_outgoing_data_streams_.rbegin()->first);
     if (old_send_order != new_send_order) {
-      session_->UpdateQueuedSendOrder(subscription_id_, old_send_order,
+      session_->UpdateQueuedSendOrder(request_id_, old_send_order,
                                       new_send_order);
     }
   }
@@ -2017,7 +2056,7 @@
     PublishedSubscription& subscription, Location first_object)
     : session_(session),
       stream_(stream),
-      subscription_id_(subscription.subscription_id()),
+      subscription_id_(subscription.request_id()),
       next_object_(first_object),
       session_liveness_(session->liveness_token_) {
   UpdateSendOrder(subscription);
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index eceaa98..bc4e80d 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -113,6 +113,7 @@
   // the track, the message will still be sent. However, the visitor will be
   // ignored.
   // Subscribe from (start_group, start_object) to the end of the track.
+  // TODO(martinduke): Allow setting forward = false.
   bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
                          uint64_t start_object,
                          SubscribeRemoteTrack::Visitor* visitor,
@@ -125,6 +126,7 @@
   bool SubscribeCurrentObject(const FullTrackName& name,
                               SubscribeRemoteTrack::Visitor* visitor,
                               VersionSpecificParameters parameters) override;
+  // TODO(martinduke): SubscribeNextGroup
   // Returns false if the subscription is not found. The session immediately
   // destroys all subscription state.
   void Unsubscribe(const FullTrackName& name);
@@ -337,7 +339,7 @@
     PublishedSubscription& operator=(const PublishedSubscription&) = delete;
     PublishedSubscription& operator=(PublishedSubscription&&) = delete;
 
-    uint64_t subscription_id() const { return subscription_id_; }
+    uint64_t request_id() const { return request_id_; }
     MoqtTrackPublisher& publisher() { return *track_publisher_; }
     uint64_t track_alias() const { return track_alias_; }
     std::optional<Location> largest_sent() const { return largest_sent_; }
@@ -372,8 +374,13 @@
                 MoqtPriority subscriber_priority);
     // Checks if the specified sequence is within the window of this
     // subscription.
-    bool InWindow(Location sequence) { return window_.InWindow(sequence); }
-    Location GetWindowStart() const { return window_.start(); }
+    bool InWindow(Location sequence) {
+      return forward_ && window_.has_value() && window_->InWindow(sequence);
+    }
+    Location GetWindowStart() const {
+      QUICHE_CHECK(window_.has_value());
+      return window_->start();
+    }
     MoqtFilterType filter_type() const { return filter_type_; };
 
     void OnDataStreamCreated(webtransport::StreamId id,
@@ -427,12 +434,16 @@
                                                   subscriber_priority_);
     }
 
-    MoqtFilterType filter_type_;
-    uint64_t subscription_id_;
     MoqtSession* session_;
     std::shared_ptr<MoqtTrackPublisher> track_publisher_;
+    uint64_t request_id_;
     uint64_t track_alias_;
-    SubscribeWindow window_;
+    MoqtFilterType filter_type_;
+    bool forward_;
+    // If window_ is nullopt, any arriving objects are ignored. This could be
+    // because forward=0, or because the subscription is waiting for a
+    // SUBSCRIBE_OK and doesn't know what the window should be yet.
+    std::optional<SubscribeWindow> window_;
     MoqtPriority subscriber_priority_;
     uint64_t streams_opened_ = 0;
 
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 2c8797c..1487e25 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -60,11 +60,13 @@
 
 MoqtSubscribe DefaultSubscribe() {
   MoqtSubscribe subscribe = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
       /*track_alias=*/2,
       kDefaultTrackName(),
       /*subscriber_priority=*/0x80,
       /*group_order=*/std::nullopt,
+      /*forward=*/true,
+      /*filter_type=*/MoqtFilterType::kAbsoluteStart,
       /*start=*/Location(0, 0),
       /*end_group=*/std::nullopt,
       /*parameters=*/VersionSpecificParameters(),
@@ -74,7 +76,7 @@
 
 MoqtFetch DefaultFetch() {
   MoqtFetch fetch = {
-      /*fetch_id=*/2,
+      /*fetch_id=*/1,
       /*subscriber_priority=*/0x80,
       /*group_order=*/std::nullopt,
       /*joining_fetch=*/std::nullopt,
@@ -155,7 +157,7 @@
       return nullptr;
     }
     MoqtSubscribeOk expected_ok = {
-        /*subscribe_id=*/subscribe.subscribe_id,
+        /*request_id=*/subscribe.request_id,
         /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
         /*group_order=*/MoqtDeliveryOrder::kAscending,
         (*track_status == MoqtTrackStatusCode::kInProgress)
@@ -346,7 +348,7 @@
   // Add the track. Now Subscribe should succeed.
   MockTrackPublisher* track = CreateTrackPublisher();
   std::make_shared<MockTrackPublisher>(request.full_track_name);
-  ++request.subscribe_id;
+  request.request_id += 2;
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
@@ -504,28 +506,56 @@
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
 
-TEST_F(MoqtSessionTest, SubscribeEntirelyInPast) {
+TEST_F(MoqtSessionTest, SubscribeDoNotForward) {
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MockTrackPublisher* track = CreateTrackPublisher();
-  SetLargestId(track, Location(10, 20));
-
   MoqtSubscribe request = DefaultSubscribe();
-  request.end_group = 9;
-  EXPECT_CALL(*track, AddObjectListener)
-      .WillOnce([&](MoqtObjectListener* listener) {
-        listener->OnSubscribeAccepted();
-      });
-  MoqtSubscribeError expected_error = {
-      /*subscribe_id=*/request.subscribe_id,
-      /*error_code=*/SubscribeErrorCode::kInvalidRange,
-      /*reason_phrase=*/"SUBSCRIBE ends in past group",
-      /*track_alias=*/request.track_alias,
-  };
-  EXPECT_CALL(mock_stream_,
-              Writev(SerializedControlMessage(expected_error), _));
-  stream_input->OnSubscribeMessage(request);
-  EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
+  request.forward = false;
+  request.filter_type = MoqtFilterType::kLatestObject;
+  EXPECT_CALL(*track, GetTrackStatus)
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+  MoqtObjectListener* listener =
+      ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+  // forward=false, so incoming objects are ignored.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .Times(0);
+  listener->OnNewObjectAvailable(Location(0, 0));
+}
+
+TEST_F(MoqtSessionTest, SubscribeAbsoluteStartNoDataYet) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtSubscribe request = DefaultSubscribe();
+  request.start = Location(1, 0);
+  EXPECT_CALL(*track, GetTrackStatus)
+      .WillRepeatedly(Return(MoqtTrackStatusCode::kNotYetBegun));
+  MoqtObjectListener* listener =
+      ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+  // Window was not set to (0, 0) by SUBSCRIBE acceptance.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .Times(0);
+  listener->OnNewObjectAvailable(Location(0, 0));
+}
+
+TEST_F(MoqtSessionTest, SubscribeNextGroup) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockTrackPublisher* track = CreateTrackPublisher();
+  MoqtSubscribe request = DefaultSubscribe();
+  request.filter_type = MoqtFilterType::kNextGroupStart;
+  SetLargestId(track, Location(10, 20));
+  MoqtObjectListener* listener =
+      ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
+  // Later objects in group 10 ignored.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .Times(0);
+  listener->OnNewObjectAvailable(Location(10, 21));
+  // Group 11 is sent.
+  EXPECT_CALL(mock_session_, CanOpenNextOutgoingUnidirectionalStream())
+      .WillOnce(Return(false));
+  listener->OnNewObjectAvailable(Location(11, 0));
 }
 
 TEST_F(MoqtSessionTest, TwoSubscribesForTrack) {
@@ -535,7 +565,7 @@
   MoqtSubscribe request = DefaultSubscribe();
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 
-  request.subscribe_id = 2;
+  request.request_id = 3;
   request.start = Location(12, 0);
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -553,13 +583,13 @@
 
   // Peer unsubscribes.
   MoqtUnsubscribe unsubscribe = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
   };
   stream_input->OnUnsubscribeMessage(unsubscribe);
   EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 1), nullptr);
 
   // Subscribe again, succeeds.
-  request.subscribe_id = 2;
+  request.request_id = 3;
   request.start = Location(12, 0);
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
@@ -567,7 +597,7 @@
 TEST_F(MoqtSessionTest, RequestIdTooHigh) {
   // Peer subscribes to (0, 0)
   MoqtSubscribe request = DefaultSubscribe();
-  request.subscribe_id = kDefaultInitialMaxRequestId + 1;
+  request.request_id = kDefaultInitialMaxRequestId + 1;
 
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
@@ -577,6 +607,10 @@
   stream_input->OnSubscribeMessage(request);
 }
 
+TEST_F(MoqtSessionTest, RequestIdWrongLsb) {
+  // TODO(martinduke): Implement this test.
+}
+
 TEST_F(MoqtSessionTest, SubscribeIdNotIncreasing) {
   MoqtSubscribe request = DefaultSubscribe();
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -650,7 +684,7 @@
                                   VersionSpecificParameters());
 
   MoqtSubscribeOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
   };
   EXPECT_CALL(remote_track_visitor, OnReply(_, _, _))
@@ -709,7 +743,9 @@
   session_.GrantMoreRequests(1);
   // Peer subscribes to (0, 0)
   MoqtSubscribe request = DefaultSubscribe();
-  request.subscribe_id = kDefaultInitialMaxRequestId;
+  MoqtSessionPeer::set_next_incoming_request_id(
+      &session_, kDefaultInitialMaxRequestId + 1);
+  request.request_id = kDefaultInitialMaxRequestId + 1;
   MockTrackPublisher* track = CreateTrackPublisher();
   ReceiveSubscribeSynchronousOk(track, request, stream_input.get());
 }
@@ -726,7 +762,7 @@
                                   VersionSpecificParameters());
 
   MoqtSubscribeError error = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*error_code=*/SubscribeErrorCode::kInvalidRange,
       /*reason_phrase=*/"deadbeef",
       /*track_alias=*/2,
@@ -1011,7 +1047,7 @@
 
   // SUBSCRIBE_OK arrives
   MoqtSubscribeOk ok = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/std::nullopt,
@@ -1056,7 +1092,7 @@
 
   // SUBSCRIBE_ERROR arrives
   MoqtSubscribeError subscribe_error = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
       /*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
       /*reason_phrase=*/"foo",
       /*track_alias =*/3,
@@ -1078,7 +1114,7 @@
 
   // SUBSCRIBE_ERROR arrives
   MoqtSubscribeError subscribe_error = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
       /*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
       /*reason_phrase=*/"foo",
       /*track_alias =*/3,
@@ -1098,7 +1134,7 @@
 
   // SUBSCRIBE_ERROR arrives
   MoqtSubscribeError subscribe_error = {
-      /*subscribe_id=*/1,
+      /*request_id=*/1,
       /*error_code=*/SubscribeErrorCode::kRetryTrackAlias,
       /*reason_phrase=*/"foo",
       /*track_alias =*/2,
@@ -1680,7 +1716,7 @@
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
   MoqtUnsubscribe unsubscribe = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
   };
   stream_input->OnUnsubscribeMessage(unsubscribe);
   EXPECT_EQ(MoqtSessionPeer::GetSubscription(&session_, 0), nullptr);
@@ -2225,12 +2261,12 @@
 
 TEST_F(MoqtSessionTest, InvalidFetch) {
   // Update the state so that it expects ID > 0 next time.
-  MoqtSessionPeer::ValidateRequestId(&session_, 0);
+  MoqtSessionPeer::ValidateRequestId(&session_, 1);
   webtransport::test::MockStream control_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
   MoqtFetch fetch = DefaultFetch();
-  fetch.fetch_id = 0;  // Too low.
+  fetch.fetch_id = 1;  // Too low.
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
                            "Request ID not monotonically increasing"))
@@ -2286,6 +2322,7 @@
 TEST_F(MoqtSessionTest, IncomingJoiningFetch) {
   MoqtSubscribe subscribe = DefaultSubscribe();
   // Give it the latest object filter.
+  subscribe.filter_type = MoqtFilterType::kLatestObject;
   subscribe.start = std::nullopt;
   subscribe.end_group = std::nullopt;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
@@ -2295,7 +2332,7 @@
   ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
   MoqtObjectListener* subscription =
-      MoqtSessionPeer::GetSubscription(&session_, subscribe.subscribe_id);
+      MoqtSessionPeer::GetSubscription(&session_, subscribe.request_id);
   ASSERT_NE(subscription, nullptr);
   EXPECT_TRUE(
       MoqtSessionPeer::InSubscriptionWindow(subscription, Location(4, 0, 11)));
@@ -2304,6 +2341,7 @@
 
   // Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
   MoqtFetch fetch = DefaultFetch();
+  fetch.fetch_id = 3;
   fetch.joining_fetch = {1, 2};
   EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
       .WillOnce(Return(std::make_unique<MockFetchTask>()));
@@ -2316,7 +2354,7 @@
   MoqtFetch fetch = DefaultFetch();
   fetch.joining_fetch = {1, 2};
   MoqtFetchError expected_error = {
-      /*subscribe_id=*/2,
+      /*request_id=*/1,
       /*error_code=*/SubscribeErrorCode::kDoesNotExist,
       /*reason_phrase=*/"Joining Fetch for non-existent subscribe",
   };
@@ -2334,6 +2372,7 @@
   ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
   MoqtFetch fetch = DefaultFetch();
+  fetch.fetch_id = 3;
   fetch.joining_fetch = {1, 2};
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -2349,16 +2388,19 @@
   EXPECT_CALL(mock_session_, GetStreamById(_))
       .WillRepeatedly(Return(&mock_stream_));
   MoqtSubscribe expected_subscribe = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*track_alias=*/0,
       /*full_track_name=*/FullTrackName("foo", "bar"),
       /*subscriber_priority=*/0x80,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*forward=*/true,
+      /*filter_type=*/MoqtFilterType::kLatestObject,
       /*start=*/std::nullopt,
       /*end_group=*/std::nullopt,
+      VersionSpecificParameters(),
   };
   MoqtFetch expected_fetch = {
-      /*fetch_id=*/1,
+      /*fetch_id=*/2,
       /*subscriber_priority=*/0x80,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*joining_fetch=*/JoiningFetch(0, 1),
@@ -2391,12 +2433,12 @@
       MoqtSubscribeOk(0, quic::QuicTimeDelta::FromMilliseconds(0),
                       MoqtDeliveryOrder::kAscending, Location(2, 0),
                       VersionSpecificParameters()));
-  stream_input->OnFetchOkMessage(MoqtFetchOk(1, MoqtDeliveryOrder::kAscending,
+  stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending,
                                              Location(2, 0),
                                              VersionSpecificParameters()));
   // Packet arrives on FETCH stream.
   MoqtObject object = {
-      /*fetch_id=*/1,
+      /*fetch_id=*/2,
       /*group_id, object_id=*/0,
       0,
       /*publisher_priority=*/128,
@@ -2479,7 +2521,7 @@
       Location(0, 0), 4, std::nullopt, 128, std::nullopt,
       VersionSpecificParameters());
   MoqtFetchOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/Location(3, 25),
       VersionSpecificParameters(),
@@ -2508,7 +2550,7 @@
       Location(0, 0), 4, std::nullopt, 128, std::nullopt,
       VersionSpecificParameters());
   MoqtFetchError error = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*error_code=*/SubscribeErrorCode::kUnauthorized,
       /*reason_phrase=*/"No username provided",
   };
@@ -2546,7 +2588,7 @@
   std::queue<quiche::QuicheBuffer> headers;
   std::queue<std::string> payloads;
   MoqtObject object = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_id, object_id=*/0,
       0,
       /*publisher_priority=*/128,
@@ -2578,7 +2620,7 @@
 
   // FETCH_OK arrives, objects are delivered.
   MoqtFetchOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/Location(3, 25),
       VersionSpecificParameters(),
@@ -2616,7 +2658,7 @@
   std::queue<quiche::QuicheBuffer> headers;
   std::queue<std::string> payloads;
   MoqtObject object = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_id, object_id=*/0,
       0,
       /*publisher_priority=*/128,
@@ -2648,7 +2690,7 @@
 
   // FETCH_OK arrives, objects are available.
   MoqtFetchOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/Location(3, 25),
       VersionSpecificParameters(),
@@ -2707,7 +2749,7 @@
   bool object_ready = false;
   task->SetObjectAvailableCallback([&]() { object_ready = true; });
   MoqtObject object = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*group_id, object_id=*/0,
       0,
       /*publisher_priority=*/128,
@@ -3035,6 +3077,7 @@
   EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
   MoqtFetch fetch = DefaultFetch();
+  fetch.fetch_id = 3;
   stream_input->OnFetchMessage(fetch);
   EXPECT_CALL(
       mock_stream_,
@@ -3121,7 +3164,7 @@
                                               &remote_track_visitor,
                                               VersionSpecificParameters()));
   MoqtSubscribeOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/std::nullopt,
@@ -3179,7 +3222,7 @@
                                               &remote_track_visitor,
                                               VersionSpecificParameters()));
   MoqtSubscribeOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/std::nullopt,
@@ -3234,7 +3277,7 @@
                                               &remote_track_visitor,
                                               VersionSpecificParameters()));
   MoqtSubscribeOk ok = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*expires=*/quic::QuicTimeDelta::FromMilliseconds(10000),
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*largest_id=*/std::nullopt,
@@ -3297,7 +3340,7 @@
                                     MoqtObjectStatus::kNormal);
   // Update the end to fall at the last delivered object.
   MoqtSubscribeUpdate update = {
-      /*subscribe_id=*/0,
+      /*request_id=*/0,
       /*start_group=*/5,
       /*start_object=*/0,
       /*end_group=*/7,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index a829930..0783141 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -116,7 +116,7 @@
     virtual void OnSubscribeDone(FullTrackName full_track_name) = 0;
   };
   SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
-      : RemoteTrack(subscribe.full_track_name, subscribe.subscribe_id,
+      : RemoteTrack(subscribe.full_track_name, subscribe.request_id,
                     SubscribeWindow(subscribe.start.value_or(Location()),
                                     subscribe.end_group)),
         track_alias_(subscribe.track_alias),
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 9ab9078..7bca1db 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -54,6 +54,8 @@
       /*full_track_name=*/FullTrackName("foo", "bar"),
       /*subscriber_priority=*/128,
       /*group_order=*/std::nullopt,
+      /*forward=*/true,
+      /*filter_type=*/MoqtFilterType::kAbsoluteStart,
       /*start=*/Location(2, 0),
       std::nullopt,
       VersionSpecificParameters(),
@@ -78,7 +80,7 @@
 
 TEST_F(SubscribeRemoteTrackTest, AllowError) {
   EXPECT_TRUE(track_.ErrorIsAllowed());
-  EXPECT_EQ(track_.GetSubscribe().subscribe_id, subscribe_.subscribe_id);
+  EXPECT_EQ(track_.GetSubscribe().request_id, subscribe_.request_id);
   track_.OnObjectOrOk();
   EXPECT_FALSE(track_.ErrorIsAllowed());
 }
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 9fa1bf3..e3ddb7b 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -91,7 +91,7 @@
                                              track.get());
     session->subscribe_by_name_.try_emplace(subscribe.full_track_name,
                                             track.get());
-    session->upstream_by_id_.try_emplace(subscribe.subscribe_id,
+    session->upstream_by_id_.try_emplace(subscribe.request_id,
                                          std::move(track));
   }
 
@@ -102,7 +102,9 @@
     MoqtSubscribe subscribe;
     subscribe.full_track_name = publisher->GetTrackName();
     subscribe.track_alias = track_alias;
-    subscribe.subscribe_id = subscribe_id;
+    subscribe.request_id = subscribe_id;
+    subscribe.forward = true;
+    subscribe.filter_type = MoqtFilterType::kAbsoluteStart;
     subscribe.start = Location(start_group, start_object);
     subscribe.subscriber_priority = 0x80;
     session->published_subscriptions_.emplace(
@@ -147,6 +149,10 @@
     session->next_request_id_ = id;
   }
 
+  static void set_next_incoming_request_id(MoqtSession* session, uint64_t id) {
+    session->next_incoming_request_id_ = id;
+  }
+
   static void set_peer_max_request_id(MoqtSession* session, uint64_t id) {
     session->peer_max_request_id_ = id;
   }
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index b0709ef..31c2521 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -465,7 +465,7 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribe>(values);
-    if (cast.subscribe_id != subscribe_.subscribe_id) {
+    if (cast.request_id != subscribe_.request_id) {
       QUIC_LOG(INFO) << "SUBSCRIBE subscribe ID mismatch";
       return false;
     }
@@ -485,6 +485,14 @@
       QUIC_LOG(INFO) << "SUBSCRIBE group order mismatch";
       return false;
     }
+    if (cast.forward != subscribe_.forward) {
+      QUIC_LOG(INFO) << "SUBSCRIBE forward mismatch";
+      return false;
+    }
+    if (cast.filter_type != subscribe_.filter_type) {
+      QUIC_LOG(INFO) << "SUBSCRIBE filter type mismatch";
+      return false;
+    }
     if (cast.start != subscribe_.start) {
       QUIC_LOG(INFO) << "SUBSCRIBE start mismatch";
       return false;
@@ -501,7 +509,7 @@
   }
 
   void ExpandVarints() override {
-    ExpandVarintsImpl("vvvv---v------vvvvv--vv-----");
+    ExpandVarintsImpl("vvvv---v-------vvvvv--vv-----");
   }
 
   MessageStructuredData structured_data() const override {
@@ -509,15 +517,16 @@
   }
 
  private:
-  uint8_t raw_packet_[31] = {
-      0x03, 0x00, 0x1c, 0x01, 0x02,  // id and alias
+  uint8_t raw_packet_[32] = {
+      0x03, 0x00, 0x1d, 0x01, 0x02,  // id and alias
       0x01, 0x03, 0x66, 0x6f, 0x6f,  // track_namespace = "foo"
       0x04, 0x61, 0x62, 0x63, 0x64,  // track_name = "abcd"
       0x20,                          // subscriber priority = 0x20
       0x02,                          // group order = descending
+      0x01,                          // forward = true
       0x03,                          // Filter type: Absolute Start
-      0x04,                          // start_group = 4 (relative previous)
-      0x01,                          // start_object = 1 (absolute)
+      0x04,                          // start_group = 4
+      0x01,                          // start_object = 1
       // No EndGroup or EndObject
       0x02,                                      // 2 parameters
       0x02, 0x67, 0x10,                          // delivery_timeout = 10000 ms
@@ -530,6 +539,8 @@
       /*full_track_name=*/FullTrackName({"foo", "abcd"}),
       /*subscriber_priority=*/0x20,
       /*group_order=*/MoqtDeliveryOrder::kDescending,
+      /*forward=*/true,
+      /*filter_type=*/MoqtFilterType::kAbsoluteStart,
       /*start=*/Location(4, 1),
       /*end_group=*/std::nullopt,
       VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),