Update MoQT SUBSCRIBE_UPDATE to draft-11.

Add a session API to update upstream SUBSCRIBE.

PiperOrigin-RevId: 758303508
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 43caa3a..139a6d7 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -534,10 +534,10 @@
   uint64_t end_group =
       message.end_group.has_value() ? *message.end_group + 1 : 0;
   return SerializeControlMessage(
-      MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.subscribe_id),
+      MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.request_id),
       WireVarInt62(message.start.group), WireVarInt62(message.start.object),
       WireVarInt62(end_group), WireUint8(message.subscriber_priority),
-      WireKeyValuePairList(parameters));
+      WireBoolean(message.forward), WireKeyValuePairList(parameters));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeAnnounce(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 16939e3..ab64df2 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -472,6 +472,7 @@
       /*start=*/Location(4, 3),
       /*end_group=*/4,
       /*subscriber_priority=*/0xaa,
+      /*forward=*/true,
       VersionSpecificParameters(),
   };
   quiche::QuicheBuffer buffer;
@@ -487,6 +488,7 @@
       /*start=*/Location(4, 3),
       /*end_group=*/4,
       /*subscriber_priority=*/0xaa,
+      /*forward=*/true,
       VersionSpecificParameters(),
   };
   quiche::QuicheBuffer buffer;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index ef7d3b0..1b61d18 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -577,10 +577,11 @@
 };
 
 struct QUICHE_EXPORT MoqtSubscribeUpdate {
-  uint64_t subscribe_id;
+  uint64_t request_id;
   Location start;
   std::optional<uint64_t> end_group;
   MoqtPriority subscriber_priority;
+  bool forward;
   VersionSpecificParameters parameters;
 };
 
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index 8761ffb..608052b 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -551,10 +551,12 @@
 size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) {
   MoqtSubscribeUpdate subscribe_update;
   uint64_t start_group, start_object, end_group;
-  if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) ||
+  uint8_t forward;
+  if (!reader.ReadVarInt62(&subscribe_update.request_id) ||
       !reader.ReadVarInt62(&start_group) ||
       !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) ||
-      !reader.ReadUInt8(&subscribe_update.subscriber_priority)) {
+      !reader.ReadUInt8(&subscribe_update.subscriber_priority) ||
+      !reader.ReadUInt8(&forward)) {
     return 0;
   }
   KeyValuePairList parameters;
@@ -578,6 +580,11 @@
       return 0;
     }
   }
+  if (forward > 1) {
+    ParseError("Invalid forward value in SUBSCRIBE_UPDATE");
+    return 0;
+  }
+  subscribe_update.forward = (forward == 1);
   visitor_.OnSubscribeUpdateMessage(subscribe_update);
   return reader.PreviouslyReadPayload().length();
 }
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 4779c89..d979ffe 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -912,8 +912,8 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kWebTrans, &stream, visitor_);
   char subscribe_update[] = {
-      0x02, 0x00, 0x0d, 0x02, 0x03, 0x01, 0x05,  // start and end sequences
-      0xaa,                                      // priority = 0xaa
+      0x02, 0x00, 0x0e, 0x02, 0x03, 0x01, 0x05,  // start and end sequences
+      0xaa, 0x01,                                // priority, forward
       0x01,                                      // 1 parameter
       0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72,  // authorization_token = "bar"
   };
@@ -1184,8 +1184,8 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe_update[] = {
-      0x02, 0x00, 0x06, 0x02, 0x03, 0x01, 0x04,  // start and end sequences
-      0x20,                                      // priority
+      0x02, 0x00, 0x07, 0x02, 0x03, 0x01, 0x04,  // start and end sequences
+      0x20, 0x01,                                // priority, forward
       0x00,                                      // No parameters
   };
   stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)),
@@ -1198,8 +1198,8 @@
   webtransport::test::InMemoryStream stream(/*stream_id=*/0);
   MoqtControlParser parser(kRawQuic, &stream, visitor_);
   char subscribe_update[] = {
-      0x02, 0x00, 0x08, 0x02, 0x03, 0x01, 0x03,  // start and end sequences
-      0x20,                                      // priority
+      0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x03,  // start and end sequences
+      0x20, 0x01,                                // priority, forward
       0x01,                                      // 1 parameter
       0x02, 0x20,                                // delivery_timeout = 32 ms
   };
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 0c8e75e..3b6be52 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -395,6 +395,45 @@
   return Subscribe(message, visitor);
 }
 
+bool MoqtSession::SubscribeUpdate(
+    const FullTrackName& name, std::optional<Location> start,
+    std::optional<uint64_t> end_group,
+    std::optional<MoqtPriority> subscriber_priority,
+    std::optional<bool> forward, VersionSpecificParameters parameters) {
+  auto it = subscribe_by_name_.find(name);
+  if (it == subscribe_by_name_.end()) {
+    return false;
+  }
+  SubscribeRemoteTrack* track = it->second;
+  MoqtSubscribeUpdate subscribe_update;
+  subscribe_update.request_id = track->request_id();
+  subscribe_update.start = start.value_or(track->window().start());
+  subscribe_update.end_group = end_group.value_or(track->window().end().group);
+  if (subscribe_update.end_group == UINT64_MAX) {
+    subscribe_update.end_group = std::nullopt;
+  }
+  subscribe_update.subscriber_priority =
+      subscriber_priority.value_or(track->subscriber_priority());
+  subscribe_update.forward = forward.value_or(track->forward());
+  subscribe_update.parameters = parameters;
+  if (subscribe_update.start < track->window().start() ||
+      (subscribe_update.end_group.has_value() &&
+       (*subscribe_update.end_group > track->window().end().group ||
+        *subscribe_update.end_group < subscribe_update.start.group))) {
+    // Invalid range.
+    return false;
+  }
+  // Input is valid. Update subscription properties.
+  track->TruncateStart(subscribe_update.start);
+  if (subscribe_update.end_group.has_value()) {
+    track->TruncateEnd(*subscribe_update.end_group);
+  }
+  track->set_subscriber_priority(subscribe_update.subscriber_priority);
+  track->set_forward(subscribe_update.forward);
+  SendControlMessage(framer_.SerializeSubscribeUpdate(subscribe_update));
+  return true;
+};
+
 void MoqtSession::Unsubscribe(const FullTrackName& name) {
   SubscribeRemoteTrack* track = RemoteTrackByName(name);
   if (track == nullptr) {
@@ -402,7 +441,7 @@
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Sent UNSUBSCRIBE message for " << name;
   MoqtUnsubscribe message;
-  message.subscribe_id = track->subscribe_id();
+  message.subscribe_id = track->request_id();
   SendControlMessage(framer_.SerializeUnsubscribe(message));
   DestroySubscription(track);
 }
@@ -1063,7 +1102,7 @@
                                   message.reason_phrase);
   }
   session_->subscribe_by_alias_.erase(subscribe->track_alias());
-  session_->upstream_by_id_.erase(subscribe->subscribe_id());
+  session_->upstream_by_id_.erase(subscribe->request_id());
 }
 
 void MoqtSession::ControlStream::OnUnsubscribeMessage(
@@ -1095,7 +1134,7 @@
 
 void MoqtSession::ControlStream::OnSubscribeUpdateMessage(
     const MoqtSubscribeUpdate& message) {
-  auto it = session_->published_subscriptions_.find(message.subscribe_id);
+  auto it = session_->published_subscriptions_.find(message.request_id);
   if (it == session_->published_subscriptions_.end()) {
     return;
   }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 77bc1fb..ec0efe1 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -115,9 +115,8 @@
 
   // Returns true if SUBSCRIBE was sent. If there is already a subscription to
   // the track, the message will still be sent. However, the visitor will be
-  // ignored.
+  // ignored. If |visitor| is nullptr, forward will be set to false.
   // Subscribe from (start_group, start_object) to the end of the track.
-  // TODO(martinduke): Allow setting forward = false.
   bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group,
                          uint64_t start_object,
                          SubscribeRemoteTrack::Visitor* visitor,
@@ -133,6 +132,11 @@
   bool SubscribeNextGroup(const FullTrackName& name,
                           SubscribeRemoteTrack::Visitor* visitor,
                           VersionSpecificParameters parameters) override;
+  bool SubscribeUpdate(const FullTrackName& name, std::optional<Location> start,
+                       std::optional<uint64_t> end_group,
+                       std::optional<MoqtPriority> subscriber_priority,
+                       std::optional<bool> forward,
+                       VersionSpecificParameters parameters) override;
   // Returns false if the subscription is not found. The session immediately
   // destroys all subscription state.
   void Unsubscribe(const FullTrackName& name);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h
index f26266d..8f76a19 100644
--- a/quiche/quic/moqt/moqt_session_interface.h
+++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -66,6 +66,14 @@
                                   SubscribeRemoteTrack::Visitor* visitor,
                                   VersionSpecificParameters parameters) = 0;
 
+  // If an argument is nullopt, there is no change to the current value.
+  virtual bool SubscribeUpdate(const FullTrackName& name,
+                               std::optional<Location> start,
+                               std::optional<uint64_t> end_group,
+                               std::optional<MoqtPriority> subscriber_priority,
+                               std::optional<bool> forward,
+                               VersionSpecificParameters parameters) = 0;
+
   // Sends an UNSUBSCRIBE message and removes all of the state related to the
   // subscription.  Returns false if the subscription is not found.
   virtual void Unsubscribe(const FullTrackName& name) = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index bbc41ad..f92dead 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -733,6 +733,68 @@
   stream_input->OnSubscribeOkMessage(ok);
 }
 
+TEST_F(MoqtSessionTest, OutgoingSubscribeUpdate) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  EXPECT_CALL(mock_session_, GetStreamById)
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+  session_.SubscribeAbsolute(FullTrackName("foo", "bar"), 1, 0, 10,
+                             &remote_track_visitor,
+                             VersionSpecificParameters());
+  MoqtSubscribeOk ok = {
+      /*request_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+  };
+  EXPECT_CALL(remote_track_visitor, OnReply);
+  stream_input->OnSubscribeOkMessage(ok);
+  EXPECT_CALL(
+      mock_stream_,
+      Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _));
+  EXPECT_TRUE(session_.SubscribeUpdate(
+      FullTrackName("foo", "bar"), Location(2, 1), 9, std::nullopt,
+      std::nullopt, VersionSpecificParameters()));
+  SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0);
+  EXPECT_FALSE(track->InWindow(Location(2, 0)));
+  EXPECT_TRUE(track->InWindow(Location(2, 1)));
+  EXPECT_TRUE(track->InWindow(Location(9, UINT64_MAX)));
+  EXPECT_FALSE(track->InWindow(Location(10, 0)));
+}
+
+TEST_F(MoqtSessionTest, OutgoingSubscribeUpdateInvalid) {
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_);
+  MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  EXPECT_CALL(mock_session_, GetStreamById)
+      .WillRepeatedly(Return(&mock_stream_));
+  EXPECT_CALL(mock_stream_,
+              Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
+  session_.SubscribeAbsolute(FullTrackName("foo", "bar"), 1, 0, 10,
+                             &remote_track_visitor,
+                             VersionSpecificParameters());
+  MoqtSubscribeOk ok = {
+      /*request_id=*/0,
+      /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+  };
+  EXPECT_CALL(remote_track_visitor, OnReply);
+  stream_input->OnSubscribeOkMessage(ok);
+  EXPECT_CALL(
+      mock_stream_,
+      Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _))
+      .Times(0);
+  EXPECT_FALSE(session_.SubscribeUpdate(
+      FullTrackName("foo", "bar"), Location(0, 0), 10, std::nullopt,
+      std::nullopt, VersionSpecificParameters()));
+  EXPECT_FALSE(session_.SubscribeUpdate(
+      FullTrackName("foo", "bar"), Location(1, 0), 11, std::nullopt,
+      std::nullopt, VersionSpecificParameters()));
+  EXPECT_FALSE(session_.SubscribeUpdate(
+      FullTrackName("foo", "bar"), Location(7, 0), 6, std::nullopt,
+      std::nullopt, VersionSpecificParameters()));
+}
+
 TEST_F(MoqtSessionTest, MaxRequestIdChangesResponse) {
   MoqtSessionPeer::set_next_request_id(&session_, kDefaultInitialMaxRequestId);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc
index 9809805..ed2c002 100644
--- a/quiche/quic/moqt/moqt_track.cc
+++ b/quiche/quic/moqt/moqt_track.cc
@@ -120,12 +120,13 @@
   }
 }
 
-void UpstreamFetch::OnFetchResult(Location largest_id, absl::Status status,
+void UpstreamFetch::OnFetchResult(Location largest_location,
+                                  absl::Status status,
                                   TaskDestroyedCallback callback) {
-  auto task = std::make_unique<UpstreamFetchTask>(largest_id, status,
+  auto task = std::make_unique<UpstreamFetchTask>(largest_location, status,
                                                   std::move(callback));
   task_ = task->weak_ptr();
-  window().TruncateEnd(largest_id);
+  window_mutable().TruncateEnd(largest_location);
   std::move(ok_callback_)(std::move(task));
   if (can_read_callback_) {
     task_.GetIfAvailable()->set_can_read_callback(
@@ -169,7 +170,7 @@
   output.status = next_object_->object_status;
   output.publisher_priority = next_object_->publisher_priority;
   output.fin_after_this = false;
-  if (output.sequence == largest_id_) {  // This is the last object.
+  if (output.sequence == largest_location_) {  // This is the last object.
     eof_ = true;
   }
   next_object_.reset();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 0783141..d9abd82 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -38,9 +38,10 @@
 class RemoteTrack {
  public:
   RemoteTrack(const FullTrackName& full_track_name, uint64_t id,
-              SubscribeWindow window)
+              SubscribeWindow window, MoqtPriority priority)
       : full_track_name_(full_track_name),
-        subscribe_id_(id),
+        request_id_(id),
+        subscriber_priority_(priority),
         window_(window),
         weak_ptr_factory_(this) {}
   virtual ~RemoteTrack() = default;
@@ -62,7 +63,7 @@
            *data_stream_type_ == MoqtDataStreamType::kStreamHeaderFetch;
   }
 
-  uint64_t subscribe_id() const { return subscribe_id_; }
+  uint64_t request_id() const { return request_id_; }
 
   // Is the object one that was requested?
   bool InWindow(Location sequence) const { return window_.InWindow(sequence); }
@@ -71,12 +72,20 @@
     return weak_ptr_factory_.Create();
   }
 
+  const SubscribeWindow& window() const { return window_; }
+
+  MoqtPriority subscriber_priority() const { return subscriber_priority_; }
+  void set_subscriber_priority(MoqtPriority priority) {
+    subscriber_priority_ = priority;
+  }
+
  protected:
-  SubscribeWindow& window() { return window_; };
+  SubscribeWindow& window_mutable() { return window_; };
 
  private:
   const FullTrackName full_track_name_;
-  const uint64_t subscribe_id_;
+  const uint64_t request_id_;
+  MoqtPriority subscriber_priority_;
   SubscribeWindow window_;
   std::optional<MoqtDataStreamType> data_stream_type_;
   // If false, an object or OK message has been received, so any ERROR message
@@ -100,7 +109,7 @@
     // automatically retry.
     virtual void OnReply(
         const FullTrackName& full_track_name,
-        std::optional<Location> largest_id,
+        std::optional<Location> largest_location,
         std::optional<absl::string_view> error_reason_phrase) = 0;
     // Called when the subscription process is far enough that it is possible to
     // send OBJECT_ACK messages; provides a callback to do so. The callback is
@@ -118,8 +127,10 @@
   SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor)
       : RemoteTrack(subscribe.full_track_name, subscribe.request_id,
                     SubscribeWindow(subscribe.start.value_or(Location()),
-                                    subscribe.end_group)),
+                                    subscribe.end_group),
+                    subscribe.subscriber_priority),
         track_alias_(subscribe.track_alias),
+        forward_(subscribe.forward),
         visitor_(visitor),
         delivery_timeout_(subscribe.parameters.delivery_timeout),
         subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {}
@@ -151,10 +162,12 @@
     }
   }
   // Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE.
-  bool TruncateStart(Location start) { return window().TruncateStart(start); }
+  bool TruncateStart(Location start) {
+    return window_mutable().TruncateStart(start);
+  }
   // Called on SUBSCRIBE_UPDATE.
   bool TruncateEnd(uint64_t end_group) {
-    return window().TruncateEnd(end_group);
+    return window_mutable().TruncateEnd(end_group);
   }
   void OnStreamOpened();
   void OnStreamClosed();
@@ -170,6 +183,9 @@
   // FETCH objects to pipe directly into the visitor.
   void OnJoiningFetchReady(std::unique_ptr<MoqtFetchTask> fetch_task);
 
+  bool forward() const { return forward_; }
+  void set_forward(bool forward) { forward_ = forward; }
+
  private:
   friend class test::MoqtSessionPeer;
   friend class test::SubscribeRemoteTrackPeer;
@@ -180,6 +196,7 @@
   std::unique_ptr<MoqtFetchTask> fetch_task_;
 
   const uint64_t track_alias_;
+  bool forward_;
   Visitor* visitor_;
   std::optional<bool> is_datagram_;
   int currently_open_streams_ = 0;
@@ -222,7 +239,8 @@
                     fetch.joining_fetch.has_value()
                         ? SubscribeWindow(Location(0, 0))
                         : SubscribeWindow(fetch.start_object, fetch.end_group,
-                                          fetch.end_object)),
+                                          fetch.end_object),
+                    fetch.subscriber_priority),
         ok_callback_(std::move(callback)) {
     // Immediately set the data stream type.
     CheckDataStreamType(MoqtDataStreamType::kStreamHeaderFetch);
@@ -235,9 +253,9 @@
     // If the UpstreamFetch is destroyed, it will call OnStreamAndFetchClosed
     // which sets the TaskDestroyedCallback to nullptr. Thus, |callback| can
     // assume that UpstreamFetch is valid.
-    UpstreamFetchTask(Location largest_id, absl::Status status,
+    UpstreamFetchTask(Location largest_location, absl::Status status,
                       TaskDestroyedCallback callback)
-        : largest_id_(largest_id),
+        : largest_location_(largest_location),
           status_(status),
           task_destroyed_callback_(std::move(callback)),
           weak_ptr_factory_(this) {}
@@ -287,7 +305,7 @@
         absl::string_view reason_phrase);
 
    private:
-    Location largest_id_;
+    Location largest_location_;
     absl::Status status_;
     TaskDestroyedCallback task_destroyed_callback_;
 
@@ -314,7 +332,7 @@
   };
 
   // Arrival of FETCH_OK/FETCH_ERROR.
-  void OnFetchResult(Location largest_id, absl::Status status,
+  void OnFetchResult(Location largest_location, absl::Status status,
                      TaskDestroyedCallback callback);
 
   UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc
index 7bca1db..bc6398d 100644
--- a/quiche/quic/moqt/moqt_track_test.cc
+++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -65,7 +65,7 @@
 
 TEST_F(SubscribeRemoteTrackTest, Queries) {
   EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar"));
-  EXPECT_EQ(track_.subscribe_id(), 1);
+  EXPECT_EQ(track_.request_id(), 1);
   EXPECT_EQ(track_.track_alias(), 2);
   EXPECT_EQ(track_.visitor(), &visitor_);
   EXPECT_FALSE(track_.is_fetch());
@@ -101,7 +101,7 @@
         }) {}
 
   MoqtFetch fetch_message_ = {
-      /*fetch_id=*/1,
+      /*request_id=*/1,
       /*subscriber_priority=*/128,
       /*group_order=*/std::nullopt,
       /*joining_fetch=*/std::nullopt,
@@ -117,7 +117,7 @@
 };
 
 TEST_F(UpstreamFetchTest, Queries) {
-  EXPECT_EQ(fetch_.subscribe_id(), 1);
+  EXPECT_EQ(fetch_.request_id(), 1);
   EXPECT_EQ(fetch_.full_track_name(), FullTrackName("foo", "bar"));
   EXPECT_FALSE(
       fetch_.CheckDataStreamType(MoqtDataStreamType::kStreamHeaderSubgroup));
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h
index 7131181..b80ed7b 100644
--- a/quiche/quic/moqt/test_tools/mock_moqt_session.h
+++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -60,6 +60,13 @@
                SubscribeRemoteTrack::Visitor* visitor,
                VersionSpecificParameters parameters),
               (override));
+  MOCK_METHOD(bool, SubscribeUpdate,
+              (const FullTrackName& name, std::optional<Location> start,
+               std::optional<uint64_t> end_group,
+               std::optional<MoqtPriority> subscriber_priority,
+               std::optional<bool> forward,
+               VersionSpecificParameters parameters),
+              (override));
   MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override));
   MOCK_METHOD(bool, Fetch,
               (const FullTrackName& name, FetchResponseCallback callback,
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 2ef9c8b..cf243c9 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -753,7 +753,7 @@
 
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtSubscribeUpdate>(values);
-    if (cast.subscribe_id != subscribe_update_.subscribe_id) {
+    if (cast.request_id != subscribe_update_.request_id) {
       QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscribe ID mismatch";
       return false;
     }
@@ -769,6 +769,10 @@
       QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscriber priority mismatch";
       return false;
     }
+    if (cast.forward != subscribe_update_.forward) {
+      QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE forward mismatch";
+      return false;
+    }
     if (cast.parameters != subscribe_update_.parameters) {
       QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE parameter mismatch";
       return false;
@@ -776,25 +780,26 @@
     return true;
   }
 
-  void ExpandVarints() override { ExpandVarintsImpl("vvvv-vv--"); }
+  void ExpandVarints() override { ExpandVarintsImpl("vvvv--vv--"); }
 
   MessageStructuredData structured_data() const override {
     return TestMessageBase::MessageStructuredData(subscribe_update_);
   }
 
  private:
-  uint8_t raw_packet_[12] = {
-      0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x05,  // start and end sequences
-      0xaa,                                      // subscriber_priority
+  uint8_t raw_packet_[13] = {
+      0x02, 0x00, 0x0a, 0x02, 0x03, 0x01, 0x05,  // start and end sequences
+      0xaa, 0x01,                                // subscriber_priority, forward
       0x01,                                      // 1 parameter
       0x02, 0x67, 0x10,                          // delivery_timeout = 10000
   };
 
   MoqtSubscribeUpdate subscribe_update_ = {
-      /*subscribe_id=*/2,
+      /*request_id=*/2,
       /*start=*/Location(3, 1),
       /*end_group=*/4,
       /*subscriber_priority=*/0xaa,
+      /*forward=*/true,
       VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000),
                                 quic::QuicTimeDelta::Infinite()),
   };