Update MoQT FETCH* messages to draft-11.

(except FETCH format change, already done)

Add a field to FETCH_OK, rename other fields.

PiperOrigin-RevId: 780281131
diff --git a/quiche/quic/moqt/moqt_failed_fetch.h b/quiche/quic/moqt/moqt_failed_fetch.h
index 154c6fc..c971897 100644
--- a/quiche/quic/moqt/moqt_failed_fetch.h
+++ b/quiche/quic/moqt/moqt_failed_fetch.h
@@ -26,9 +26,9 @@
       ObjectsAvailableCallback /*callback*/) override {}
   void SetFetchResponseCallback(FetchResponseCallback callback) {
     MoqtFetchError error;
-    error.subscribe_id = 0;
+    error.request_id = 0;
     error.error_code = StatusToRequestErrorCode(status_);
-    error.reason_phrase = status_.message();
+    error.error_reason = status_.message();
     std::move(callback)(error);
   }
 
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 842a356..db73875 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -699,7 +699,7 @@
     const StandaloneFetch& standalone_fetch =
         std::get<StandaloneFetch>(message.fetch);
     return SerializeControlMessage(
-        MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
+        MoqtMessageType::kFetch, WireVarInt62(message.request_id),
         WireUint8(message.subscriber_priority),
         WireDeliveryOrder(message.group_order),
         WireVarInt62(FetchType::kStandalone),
@@ -726,19 +726,13 @@
     joining_start = joining_fetch.joining_start;
   }
   return SerializeControlMessage(
-      MoqtMessageType::kFetch, WireVarInt62(message.fetch_id),
+      MoqtMessageType::kFetch, WireVarInt62(message.request_id),
       WireUint8(message.subscriber_priority),
       WireDeliveryOrder(message.group_order),
       WireVarInt62(message.fetch.index() + 1), WireVarInt62(subscribe_id),
       WireVarInt62(joining_start), WireKeyValuePairList(parameters));
 }
 
-quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
-    const MoqtFetchCancel& message) {
-  return SerializeControlMessage(MoqtMessageType::kFetchCancel,
-                                 WireVarInt62(message.subscribe_id));
-}
-
 quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) {
   KeyValuePairList parameters;
   VersionSpecificParametersToKeyValuePairList(message.parameters, parameters);
@@ -748,20 +742,26 @@
         << "Serializing invalid MoQT parameters";
     return quiche::QuicheBuffer();
   }
-  return SerializeControlMessage(MoqtMessageType::kFetchOk,
-                                 WireVarInt62(message.subscribe_id),
-                                 WireDeliveryOrder(message.group_order),
-                                 WireVarInt62(message.largest_id.group),
-                                 WireVarInt62(message.largest_id.object),
-                                 WireKeyValuePairList(parameters));
+  return SerializeControlMessage(
+      MoqtMessageType::kFetchOk, WireVarInt62(message.request_id),
+      WireDeliveryOrder(message.group_order), WireBoolean(message.end_of_track),
+      WireVarInt62(message.end_location.group),
+      WireVarInt62(message.end_location.object),
+      WireKeyValuePairList(parameters));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
     const MoqtFetchError& message) {
   return SerializeControlMessage(
-      MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id),
+      MoqtMessageType::kFetchError, WireVarInt62(message.request_id),
       WireVarInt62(message.error_code),
-      WireStringWithVarInt62Length(message.reason_phrase));
+      WireStringWithVarInt62Length(message.error_reason));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
+    const MoqtFetchCancel& message) {
+  return SerializeControlMessage(MoqtMessageType::kFetchCancel,
+                                 WireVarInt62(message.request_id));
 }
 
 quiche::QuicheBuffer MoqtFramer::SerializeRequestsBlocked(
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 43d4ba6..d6d27fd 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -766,7 +766,7 @@
 };
 
 struct QUICHE_EXPORT MoqtFetch {
-  uint64_t fetch_id;
+  uint64_t request_id;
   MoqtPriority subscriber_priority;
   std::optional<MoqtDeliveryOrder> group_order;
   std::variant<StandaloneFetch, JoiningFetchRelative, JoiningFetchAbsolute>
@@ -774,21 +774,22 @@
   VersionSpecificParameters parameters;
 };
 
-struct QUICHE_EXPORT MoqtFetchCancel {
-  uint64_t subscribe_id;
-};
-
 struct QUICHE_EXPORT MoqtFetchOk {
-  uint64_t subscribe_id;
+  uint64_t request_id;
   MoqtDeliveryOrder group_order;
-  Location largest_id;
+  bool end_of_track;
+  Location end_location;
   VersionSpecificParameters parameters;
 };
 
 struct QUICHE_EXPORT MoqtFetchError {
-  uint64_t subscribe_id;
+  uint64_t request_id;
   RequestErrorCode error_code;
-  std::string reason_phrase;
+  std::string error_reason;
+};
+
+struct QUICHE_EXPORT MoqtFetchCancel {
+  uint64_t request_id;
 };
 
 struct QUICHE_EXPORT MoqtRequestsBlocked {
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index f9a1361..c61b220 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -123,7 +123,7 @@
         MoqtFetchError error(0, StatusToRequestErrorCode(status_),
                              std::string(status_.message()));
         error.error_code = StatusToRequestErrorCode(status_);
-        error.reason_phrase = status_.message();
+        error.error_reason = status_.message();
         std::move(callback)(error);
         return;
       }
@@ -135,11 +135,13 @@
       }
       MoqtFetchOk ok;
       ok.group_order = MoqtDeliveryOrder::kAscending;
-      ok.largest_id = *(objects_.crbegin());
-      if (objects_.size() > 1 && *(objects_.cbegin()) > ok.largest_id) {
+      ok.end_location = *(objects_.crbegin());
+      if (objects_.size() > 1 && *(objects_.cbegin()) > ok.end_location) {
         ok.group_order = MoqtDeliveryOrder::kDescending;
-        ok.largest_id = *(objects_.cbegin());
+        ok.end_location = *(objects_.cbegin());
       }
+      ok.end_of_track =
+          queue_->closed_ && ok.end_location == queue_->GetLargestLocation();
       std::move(callback)(ok);
     }
 
diff --git a/quiche/quic/moqt/moqt_outgoing_queue_test.cc b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
index 46e0d19..5f677b8 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue_test.cc
+++ b/quiche/quic/moqt/moqt_outgoing_queue_test.cc
@@ -9,6 +9,7 @@
 #include <optional>
 #include <string>
 #include <utility>
+#include <variant>
 #include <vector>
 
 #include "absl/status/status.h"
@@ -24,6 +25,7 @@
 #include "quiche/common/platform/api/quiche_expect_bug.h"
 #include "quiche/common/platform/api/quiche_logging.h"
 #include "quiche/common/platform/api/quiche_test.h"
+#include "quiche/common/quiche_mem_slice.h"
 #include "quiche/common/test_tools/quiche_test_utils.h"
 #include "quiche/web_transport/web_transport.h"
 
@@ -35,6 +37,7 @@
 using ::quiche::test::StatusIs;
 using ::testing::AnyOf;
 using ::testing::ElementsAre;
+using ::testing::Field;
 using ::testing::IsEmpty;
 
 class TestMoqtOutgoingQueue : public MoqtOutgoingQueue,
@@ -49,9 +52,12 @@
   void OnNewObjectAvailable(Location sequence, uint64_t subgroup) override {
     std::optional<PublishedObject> object =
         GetCachedObject(sequence.group, subgroup, sequence.object);
-    QUICHE_CHECK(object.has_value());
-    ASSERT_THAT(object->metadata.status, AnyOf(MoqtObjectStatus::kNormal,
-                                               MoqtObjectStatus::kEndOfGroup));
+    ASSERT_THAT(object,
+                Optional(Field(&PublishedObject::metadata,
+                               Field(&PublishedObjectMetadata::status,
+                                     AnyOf(MoqtObjectStatus::kNormal,
+                                           MoqtObjectStatus::kEndOfGroup,
+                                           MoqtObjectStatus::kEndOfTrack)))));
     if (object->metadata.status == MoqtObjectStatus::kNormal) {
       PublishObject(object->metadata.location.group,
                     object->metadata.location.object,
@@ -372,5 +378,50 @@
   EXPECT_GE(object->metadata.arrival_time, test_start);
 }
 
+TEST(MoqtOutgoingQueue, EndOfTrack) {
+  TestMoqtOutgoingQueue queue;
+  queue.AddObject(quiche::QuicheMemSlice::Copy("a"), true);  // Create (0, 0)
+  queue.AddObject(quiche::QuicheMemSlice::Copy("b"), true);  // Create (1, 0)
+  std::unique_ptr<MoqtFetchTask> fetch = queue.Fetch(
+      Location{0, 0}, 5, std::nullopt, MoqtDeliveryOrder::kAscending);
+  bool end_of_track = false;
+  Location end_location;
+  // end_of_track is false before Close() is called.
+  fetch->SetFetchResponseCallback(
+      [&end_of_track,
+       &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+        end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+        end_location = std::get<MoqtFetchOk>(arg).end_location;
+      });
+  EXPECT_FALSE(end_of_track);
+  EXPECT_EQ(end_location, Location(1, 0));
+
+  queue.Close();  // Create (2, 0)
+  EXPECT_EQ(queue.GetLargestLocation(), Location(2, 0));
+  fetch = queue.Fetch(Location{0, 0}, 1, std::nullopt,
+                      MoqtDeliveryOrder::kAscending);
+  // end_of_track is false if the fetch does not include the last object.
+  fetch->SetFetchResponseCallback(
+      [&end_of_track,
+       &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+        end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+        end_location = std::get<MoqtFetchOk>(arg).end_location;
+      });
+  EXPECT_FALSE(end_of_track);
+  EXPECT_EQ(end_location, Location(1, 1));
+
+  fetch = queue.Fetch(Location{0, 0}, 5, std::nullopt,
+                      MoqtDeliveryOrder::kAscending);
+  // end_of_track is true if the fetch includes the last object.
+  fetch->SetFetchResponseCallback(
+      [&end_of_track,
+       &end_location](std::variant<MoqtFetchOk, MoqtFetchError> arg) {
+        end_of_track = std::get<MoqtFetchOk>(arg).end_of_track;
+        end_location = std::get<MoqtFetchOk>(arg).end_location;
+      });
+  EXPECT_TRUE(end_of_track);
+  EXPECT_EQ(end_location, Location(2, 0));
+}
+
 }  // namespace
 }  // namespace moqt
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index d29b5a2..52de027 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -788,7 +788,7 @@
   MoqtFetch fetch;
   uint8_t group_order;
   uint64_t type;
-  if (!reader.ReadVarInt62(&fetch.fetch_id) ||
+  if (!reader.ReadVarInt62(&fetch.request_id) ||
       !reader.ReadUInt8(&fetch.subscriber_priority) ||
       !reader.ReadUInt8(&group_order) || !reader.ReadVarInt62(&type)) {
     return 0;
@@ -862,23 +862,14 @@
   return reader.PreviouslyReadPayload().length();
 }
 
-size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
-  MoqtFetchCancel fetch_cancel;
-  if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
-    return 0;
-  }
-  visitor_.OnFetchCancelMessage(fetch_cancel);
-  return reader.PreviouslyReadPayload().length();
-}
-
 size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
   MoqtFetchOk fetch_ok;
-  uint8_t group_order;
+  uint8_t group_order, end_of_track;
   KeyValuePairList parameters;
-  if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
-      !reader.ReadUInt8(&group_order) ||
-      !reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
-      !reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
+  if (!reader.ReadVarInt62(&fetch_ok.request_id) ||
+      !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&end_of_track) ||
+      !reader.ReadVarInt62(&fetch_ok.end_location.group) ||
+      !reader.ReadVarInt62(&fetch_ok.end_location.object) ||
       !ParseKeyValuePairList(reader, parameters)) {
     return 0;
   }
@@ -886,12 +877,17 @@
     ParseError("Invalid group order value in FETCH_OK");
     return 0;
   }
+  if (end_of_track > 0x01) {
+    ParseError("Invalid end of track value in FETCH_OK");
+    return 0;
+  }
   if (!ValidateVersionSpecificParameters(parameters,
                                          MoqtMessageType::kFetchOk)) {
     ParseError("FETCH_OK message contains invalid parameters");
     return 0;
   }
   fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
+  fetch_ok.end_of_track = end_of_track == 1;
   if (!KeyValuePairListToVersionSpecificParameters(parameters,
                                                    fetch_ok.parameters)) {
     return 0;
@@ -903,9 +899,9 @@
 size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
   MoqtFetchError fetch_error;
   uint64_t error_code;
-  if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
+  if (!reader.ReadVarInt62(&fetch_error.request_id) ||
       !reader.ReadVarInt62(&error_code) ||
-      !reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
+      !reader.ReadStringVarInt62(fetch_error.error_reason)) {
     return 0;
   }
   fetch_error.error_code = static_cast<RequestErrorCode>(error_code);
@@ -913,6 +909,15 @@
   return reader.PreviouslyReadPayload().length();
 }
 
+size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
+  MoqtFetchCancel fetch_cancel;
+  if (!reader.ReadVarInt62(&fetch_cancel.request_id)) {
+    return 0;
+  }
+  visitor_.OnFetchCancelMessage(fetch_cancel);
+  return reader.PreviouslyReadPayload().length();
+}
+
 size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) {
   MoqtRequestsBlocked requests_blocked;
   if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index ef1450b..55a27a3 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -207,7 +207,7 @@
     return;
   }
   QUICHE_DLOG(INFO) << ENDPOINT
-                    << "Received OBJECT message in datagram for subscribe_id "
+                    << "Received OBJECT message in datagram for request_id "
                     << " for track alias " << message.track_alias
                     << " with sequence " << message.group_id << ":"
                     << message.object_id << " priority "
@@ -485,7 +485,7 @@
   }
   MoqtFetch message;
   message.fetch = StandaloneFetch(name, start, end_group, end_object);
-  message.fetch_id = next_request_id_;
+  message.request_id = next_request_id_;
   next_request_id_ += 2;
   message.subscriber_priority = priority;
   message.group_order = delivery_order;
@@ -494,7 +494,7 @@
   QUIC_DLOG(INFO) << ENDPOINT << "Sent FETCH message for " << name;
   auto fetch = std::make_unique<UpstreamFetch>(
       message, std::get<StandaloneFetch>(message.fetch), std::move(callback));
-  upstream_by_id_.emplace(message.fetch_id, std::move(fetch));
+  upstream_by_id_.emplace(message.request_id, std::move(fetch));
   return true;
 }
 
@@ -546,7 +546,7 @@
     return false;
   }
   MoqtFetch fetch;
-  fetch.fetch_id = next_request_id_;
+  fetch.request_id = next_request_id_;
   next_request_id_ += 2;
   fetch.subscriber_priority = priority;
   fetch.group_order = delivery_order;
@@ -556,7 +556,7 @@
   QUIC_DLOG(INFO) << ENDPOINT << "Sent Joining FETCH message for " << name;
   auto upstream_fetch =
       std::make_unique<UpstreamFetch>(fetch, name, std::move(callback));
-  upstream_by_id_.emplace(fetch.fetch_id, std::move(upstream_fetch));
+  upstream_by_id_.emplace(fetch.request_id, std::move(upstream_fetch));
   return true;
 }
 
@@ -598,7 +598,7 @@
           continue;
         }
         if (fetch->session_->WriteObjectToStream(
-                stream_, fetch->fetch_id_, object.metadata,
+                stream_, fetch->request_id(), object.metadata,
                 std::move(object.payload),
                 MoqtDataStreamType::kStreamHeaderFetch, !stream_header_written_,
                 /*fin=*/false)) {
@@ -798,8 +798,8 @@
   return it->second;
 }
 
-RemoteTrack* MoqtSession::RemoteTrackById(uint64_t subscribe_id) {
-  auto it = upstream_by_id_.find(subscribe_id);
+RemoteTrack* MoqtSession::RemoteTrackById(uint64_t request_id) {
+  auto it = upstream_by_id_.find(request_id);
   if (it == upstream_by_id_.end()) {
     return nullptr;
   }
@@ -853,19 +853,18 @@
 }
 
 void MoqtSession::UpdateQueuedSendOrder(
-    uint64_t subscribe_id,
-    std::optional<webtransport::SendOrder> old_send_order,
+    uint64_t request_id, std::optional<webtransport::SendOrder> old_send_order,
     std::optional<webtransport::SendOrder> new_send_order) {
   if (old_send_order == new_send_order) {
     return;
   }
   if (old_send_order.has_value()) {
     subscribes_with_queued_outgoing_data_streams_.erase(
-        SubscriptionWithQueuedStream{*old_send_order, subscribe_id});
+        SubscriptionWithQueuedStream{*old_send_order, request_id});
   }
   if (new_send_order.has_value()) {
     subscribes_with_queued_outgoing_data_streams_.emplace(*new_send_order,
-                                                          subscribe_id);
+                                                          request_id);
   }
 }
 
@@ -985,12 +984,12 @@
 }
 
 void MoqtSession::ControlStream::SendFetchError(
-    uint64_t subscribe_id, RequestErrorCode error_code,
-    absl::string_view reason_phrase) {
+    uint64_t request_id, RequestErrorCode error_code,
+    absl::string_view error_reason) {
   MoqtFetchError fetch_error;
-  fetch_error.subscribe_id = subscribe_id;
+  fetch_error.request_id = request_id;
   fetch_error.error_code = error_code;
-  fetch_error.reason_phrase = reason_phrase;
+  fetch_error.error_reason = error_reason;
   SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error));
 }
 
@@ -1051,7 +1050,7 @@
   RemoteTrack* track = session_->RemoteTrackById(message.request_id);
   if (track == nullptr) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                    << "subscribe_id = " << message.request_id
+                    << "request_id = " << message.request_id
                     << " but no track exists";
     // Subscription state might have been destroyed for internal reasons.
     return;
@@ -1063,12 +1062,12 @@
   }
   if (message.largest_location.has_value()) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                    << "subscribe_id = " << message.request_id << " "
+                    << "request_id = " << message.request_id << " "
                     << track->full_track_name()
                     << " largest_id = " << *message.largest_location;
   } else {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_OK for "
-                    << "subscribe_id = " << message.request_id << " "
+                    << "request_id = " << message.request_id << " "
                     << track->full_track_name();
   }
   SubscribeRemoteTrack* subscribe = static_cast<SubscribeRemoteTrack*>(track);
@@ -1088,7 +1087,7 @@
   RemoteTrack* track = session_->RemoteTrackById(message.request_id);
   if (track == nullptr) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
-                    << "subscribe_id = " << message.request_id
+                    << "request_id = " << message.request_id
                     << " but no track exists";
     // Subscription state might have been destroyed for internal reasons.
     return;
@@ -1104,7 +1103,7 @@
     return;
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received the SUBSCRIBE_ERROR for "
-                  << "subscribe_id = " << message.request_id << " ("
+                  << "request_id = " << message.request_id << " ("
                   << track->full_track_name() << ")"
                   << ", error = " << static_cast<int>(message.error_code)
                   << " (" << message.reason_phrase << ")";
@@ -1343,12 +1342,12 @@
 }
 
 void MoqtSession::ControlStream::OnFetchMessage(const MoqtFetch& message) {
-  if (!session_->ValidateRequestId(message.fetch_id)) {
+  if (!session_->ValidateRequestId(message.request_id)) {
     return;
   }
   if (session_->sent_goaway_) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received a FETCH after GOAWAY";
-    SendFetchError(message.fetch_id, RequestErrorCode::kUnauthorized,
+    SendFetchError(message.request_id, RequestErrorCode::kUnauthorized,
                    "FETCH after GOAWAY");
     return;
   }
@@ -1375,7 +1374,7 @@
       QUIC_DLOG(INFO) << ENDPOINT << "Received a JOINING_FETCH for "
                       << "subscribe_id " << joining_subscribe_id
                       << " that does not exist";
-      SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist,
+      SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
                      "Joining Fetch for non-existent subscribe");
       return;
     }
@@ -1419,7 +1418,7 @@
     QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
                     << " rejected by the application: "
                     << track_publisher.status();
-    SendFetchError(message.fetch_id, RequestErrorCode::kTrackDoesNotExist,
+    SendFetchError(message.request_id, RequestErrorCode::kTrackDoesNotExist,
                    track_publisher.status().message());
     return;
   }
@@ -1431,23 +1430,23 @@
   if (!fetch->GetStatus().ok()) {
     QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
                     << " could not initialize the task";
-    SendFetchError(message.fetch_id, RequestErrorCode::kInvalidRange,
+    SendFetchError(message.request_id, RequestErrorCode::kInvalidRange,
                    fetch->GetStatus().message());
     return;
   }
   auto published_fetch = std::make_unique<PublishedFetch>(
-      message.fetch_id, session_, std::move(fetch));
-  auto result = session_->incoming_fetches_.emplace(message.fetch_id,
+      message.request_id, session_, std::move(fetch));
+  auto result = session_->incoming_fetches_.emplace(message.request_id,
                                                     std::move(published_fetch));
   if (!result.second) {  // Emplace failed.
     QUIC_DLOG(INFO) << ENDPOINT << "FETCH for " << track_name
                     << " could not be added to the session";
-    SendFetchError(message.fetch_id, RequestErrorCode::kInternalError,
+    SendFetchError(message.request_id, RequestErrorCode::kInternalError,
                    "Could not initialize FETCH state");
   }
   MoqtFetchTask* fetch_task = result.first->second->fetch_task();
   fetch_task->SetFetchResponseCallback(
-      [this, request_id = message.fetch_id, fetch_start = start_object,
+      [this, request_id = message.request_id, fetch_start = start_object,
        fetch_end = Location(end_group, end_object.value_or(UINT64_MAX))](
           std::variant<MoqtFetchOk, MoqtFetchError> message) {
         if (!session_->incoming_fetches_.contains(request_id)) {
@@ -1455,11 +1454,11 @@
         }
         if (std::holds_alternative<MoqtFetchOk>(message)) {
           MoqtFetchOk& fetch_ok = std::get<MoqtFetchOk>(message);
-          fetch_ok.subscribe_id = request_id;
-          if (fetch_ok.largest_id < fetch_start ||
-              fetch_ok.largest_id > fetch_end) {
+          fetch_ok.request_id = request_id;
+          if (fetch_ok.end_location < fetch_start ||
+              fetch_ok.end_location > fetch_end) {
             // TODO(martinduke): Add end_of_track to fetch_ok and check it's
-            // larger than largest_id.
+            // larger than end_location.
             QUIC_BUG(quic_bug_fetch_ok_status_error)
                 << "FETCH_OK end or end_of_track is invalid";
             session_->Error(MoqtError::kInternalError, "FETCH_OK status error");
@@ -1469,14 +1468,14 @@
           return;
         }
         MoqtFetchError& fetch_error = std::get<MoqtFetchError>(message);
-        fetch_error.subscribe_id = request_id;
+        fetch_error.request_id = request_id;
         SendOrBufferMessage(session_->framer_.SerializeFetchError(fetch_error));
       });
   // Set a temporary new-object callback that creates a data stream. When
   // created, the stream visitor will replace this callback.
   fetch_task->SetObjectAvailableCallback(
       [this, send_order = SendOrderForFetch(message.subscriber_priority),
-       request_id = message.fetch_id]() {
+       request_id = message.request_id]() {
         auto it = session_->incoming_fetches_.find(request_id);
         if (it == session_->incoming_fetches_.end()) {
           return;
@@ -1495,10 +1494,10 @@
 }
 
 void MoqtSession::ControlStream::OnFetchOkMessage(const MoqtFetchOk& message) {
-  RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id);
+  RemoteTrack* track = session_->RemoteTrackById(message.request_id);
   if (track == nullptr) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for "
-                    << "subscribe_id = " << message.subscribe_id
+                    << "request_id = " << message.request_id
                     << " but no track exists";
     // Subscription state might have been destroyed for internal reasons.
     return;
@@ -1508,21 +1507,20 @@
                     "Received FETCH_OK for a SUBSCRIBE");
     return;
   }
-  QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for subscribe_id = "
-                  << message.subscribe_id << " " << track->full_track_name();
+  QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_OK for request_id = "
+                  << message.request_id << " " << track->full_track_name();
   UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
-  fetch->OnFetchResult(message.largest_id, absl::OkStatus(),
-                       [=, session = session_]() {
-                         session->CancelFetch(message.subscribe_id);
-                       });
+  fetch->OnFetchResult(
+      message.end_location, absl::OkStatus(),
+      [=, session = session_]() { session->CancelFetch(message.request_id); });
 }
 
 void MoqtSession::ControlStream::OnFetchErrorMessage(
     const MoqtFetchError& message) {
-  RemoteTrack* track = session_->RemoteTrackById(message.subscribe_id);
+  RemoteTrack* track = session_->RemoteTrackById(message.request_id);
   if (track == nullptr) {
     QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for "
-                    << "subscribe_id = " << message.subscribe_id
+                    << "request_id = " << message.request_id
                     << " but no track exists";
     // Subscription state might have been destroyed for internal reasons.
     return;
@@ -1538,15 +1536,15 @@
     return;
   }
   QUIC_DLOG(INFO) << ENDPOINT << "Received the FETCH_ERROR for "
-                  << "subscribe_id = " << message.subscribe_id << " ("
+                  << "request_id = " << message.request_id << " ("
                   << track->full_track_name() << ")"
                   << ", error = " << static_cast<int>(message.error_code)
-                  << " (" << message.reason_phrase << ")";
+                  << " (" << message.error_reason << ")";
   UpstreamFetch* fetch = static_cast<UpstreamFetch*>(track);
   absl::Status status =
-      RequestErrorCodeToStatus(message.error_code, message.reason_phrase);
+      RequestErrorCodeToStatus(message.error_code, message.error_reason);
   fetch->OnFetchResult(Location(0, 0), status, nullptr);
-  session_->upstream_by_id_.erase(message.subscribe_id);
+  session_->upstream_by_id_.erase(message.request_id);
 }
 
 void MoqtSession::ControlStream::OnRequestsBlockedMessage(
@@ -2317,19 +2315,19 @@
   return true;
 }
 
-void MoqtSession::CancelFetch(uint64_t subscribe_id) {
+void MoqtSession::CancelFetch(uint64_t request_id) {
   if (is_closing_) {
     return;
   }
   // This is only called from the callback where UpstreamFetchTask has been
   // destroyed, so there is no need to notify the application.
-  upstream_by_id_.erase(subscribe_id);
+  upstream_by_id_.erase(request_id);
   ControlStream* stream = GetControlStream();
   if (stream == nullptr) {
     return;
   }
   MoqtFetchCancel message;
-  message.subscribe_id = subscribe_id;
+  message.request_id = request_id;
   stream->SendOrBufferMessage(framer_.SerializeFetchCancel(message));
   // The FETCH_CANCEL will cause a RESET_STREAM to return, which would be the
   // same as a STOP_SENDING. However, a FETCH_CANCEL works even if the stream
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index 356a4eb..8ca159e 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -296,8 +296,8 @@
 
    private:
     friend class test::MoqtSessionPeer;
-    void SendFetchError(uint64_t subscribe_id, RequestErrorCode error_code,
-                        absl::string_view reason_phrase);
+    void SendFetchError(uint64_t request_id, RequestErrorCode error_code,
+                        absl::string_view error_reason);
 
     MoqtSession* session_;
     webtransport::Stream* stream_;
@@ -562,9 +562,11 @@
 
   class QUICHE_EXPORT PublishedFetch {
    public:
-    PublishedFetch(uint64_t fetch_id, MoqtSession* session,
+    PublishedFetch(uint64_t request_id, MoqtSession* session,
                    std::unique_ptr<MoqtFetchTask> fetch)
-        : session_(session), fetch_(std::move(fetch)), fetch_id_(fetch_id) {}
+        : session_(session),
+          fetch_(std::move(fetch)),
+          request_id_(request_id) {}
 
     class FetchStreamVisitor : public webtransport::StreamVisitor {
      public:
@@ -577,7 +579,7 @@
       ~FetchStreamVisitor() {
         std::shared_ptr<PublishedFetch> fetch = fetch_.lock();
         if (fetch != nullptr) {
-          fetch->session()->incoming_fetches_.erase(fetch->fetch_id_);
+          fetch->session()->incoming_fetches_.erase(fetch->request_id_);
         }
       }
       // webtransport::StreamVisitor implementation.
@@ -597,13 +599,13 @@
 
     MoqtFetchTask* fetch_task() { return fetch_.get(); }
     MoqtSession* session() { return session_; }
-    uint64_t fetch_id() const { return fetch_id_; }
+    uint64_t request_id() const { return request_id_; }
     void SetStreamId(webtransport::StreamId id) { stream_id_ = id; }
 
    private:
     MoqtSession* session_;
     std::unique_ptr<MoqtFetchTask> fetch_;
-    uint64_t fetch_id_;
+    uint64_t request_id_;
     // Store the stream ID in case a FETCH_CANCEL requires a reset.
     std::optional<webtransport::StreamId> stream_id_;
   };
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 80b3520..a89bf05 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -76,7 +76,7 @@
 
 MoqtFetch DefaultFetch() {
   MoqtFetch fetch = {
-      /*fetch_id=*/1,
+      /*request_id=*/1,
       /*subscriber_priority=*/0x80,
       /*group_order=*/std::nullopt,
       /*fetch=*/
@@ -2264,9 +2264,10 @@
 
   // Compose and send the FETCH_OK.
   MoqtFetchOk expected_ok;
-  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.request_id = fetch.request_id;
   expected_ok.group_order = MoqtDeliveryOrder::kAscending;
-  expected_ok.largest_id = Location(1, 4);
+  expected_ok.end_of_track = false;
+  expected_ok.end_location = Location(1, 4);
   EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
   fetch_task->CallFetchResponseCallback(expected_ok);
   // Data arrives.
@@ -2288,9 +2289,10 @@
   MockTrackPublisher* track = CreateTrackPublisher();
 
   MoqtFetchOk expected_ok;
-  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.request_id = fetch.request_id;
   expected_ok.group_order = MoqtDeliveryOrder::kAscending;
-  expected_ok.largest_id = Location(1, 4);
+  expected_ok.end_of_track = false;
+  expected_ok.end_location = Location(1, 4);
   auto fetch_task_ptr =
       std::make_unique<MockFetchTask>(expected_ok, std::nullopt, true);
   MockFetchTask* fetch_task = fetch_task_ptr.get();
@@ -2329,9 +2331,10 @@
   stream_input->OnFetchMessage(fetch);
 
   MoqtFetchOk expected_ok;
-  expected_ok.subscribe_id = fetch.fetch_id;
+  expected_ok.request_id = fetch.request_id;
   expected_ok.group_order = MoqtDeliveryOrder::kAscending;
-  expected_ok.largest_id = Location(1, 4);
+  expected_ok.end_of_track = false;
+  expected_ok.end_location = Location(1, 4);
   EXPECT_CALL(mock_stream_, Writev(SerializedControlMessage(expected_ok), _));
   fetch_task->CallFetchResponseCallback(expected_ok);
 }
@@ -2355,9 +2358,9 @@
   stream_input->OnFetchMessage(fetch);
 
   MoqtFetchError expected_error;
-  expected_error.subscribe_id = fetch.fetch_id;
+  expected_error.request_id = fetch.request_id;
   expected_error.error_code = RequestErrorCode::kTrackDoesNotExist;
-  expected_error.reason_phrase = "foo";
+  expected_error.error_reason = "foo";
   EXPECT_CALL(mock_stream_,
               Writev(SerializedControlMessage(expected_error), _));
   fetch_task->CallFetchResponseCallback(expected_error);
@@ -2370,7 +2373,7 @@
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &control_stream);
   MoqtFetch fetch = DefaultFetch();
-  fetch.fetch_id = 1;  // Too low.
+  fetch.request_id = 1;  // Too low.
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kInvalidRequestId),
                            "Request ID not monotonically increasing"))
@@ -2445,7 +2448,7 @@
 
   // Joining FETCH arrives. The resulting Fetch should begin at (2, 0).
   MoqtFetch fetch = DefaultFetch();
-  fetch.fetch_id = 3;
+  fetch.request_id = 3;
   fetch.fetch = JoiningFetchRelative(1, 2);
   EXPECT_CALL(*track, Fetch(Location(2, 0), 4, std::optional<uint64_t>(10), _))
       .WillOnce(Return(std::make_unique<MockFetchTask>()));
@@ -2476,7 +2479,7 @@
   ReceiveSubscribeSynchronousOk(track, subscribe, stream_input.get());
 
   MoqtFetch fetch = DefaultFetch();
-  fetch.fetch_id = 3;
+  fetch.request_id = 3;
   fetch.fetch = JoiningFetchRelative(1, 2);
   EXPECT_CALL(mock_session_,
               CloseSession(static_cast<uint64_t>(MoqtError::kProtocolViolation),
@@ -2504,7 +2507,7 @@
       VersionSpecificParameters(),
   };
   MoqtFetch expected_fetch = {
-      /*fetch_id=*/2,
+      /*request_id=*/2,
       /*subscriber_priority=*/0x80,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*fetch=*/JoiningFetchRelative(0, 1),
@@ -2538,11 +2541,11 @@
                       MoqtDeliveryOrder::kAscending, Location(2, 0),
                       VersionSpecificParameters()));
   stream_input->OnFetchOkMessage(MoqtFetchOk(2, MoqtDeliveryOrder::kAscending,
-                                             Location(2, 0),
+                                             false, Location(2, 0),
                                              VersionSpecificParameters()));
   // Packet arrives on FETCH stream.
   MoqtObject object = {
-      /*fetch_id=*/2,
+      /*request_id=*/2,
       /*group_id, object_id=*/0,
       0,
       /*publisher_priority=*/128,
@@ -2627,7 +2630,8 @@
   MoqtFetchOk ok = {
       /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
-      /*largest_id=*/Location(3, 25),
+      /*end_of_track=*/false,
+      /*end_location=*/Location(3, 25),
       VersionSpecificParameters(),
   };
   stream_input->OnFetchOkMessage(ok);
@@ -2726,7 +2730,8 @@
   MoqtFetchOk ok = {
       /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
-      /*largest_id=*/Location(3, 25),
+      /*end_of_track=*/false,
+      /*end_location=*/Location(3, 25),
       VersionSpecificParameters(),
   };
   stream_input->OnFetchOkMessage(ok);
@@ -2796,7 +2801,8 @@
   MoqtFetchOk ok = {
       /*request_id=*/0,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
-      /*largest_id=*/Location(3, 25),
+      /*end_of_track=*/false,
+      /*end_location=*/Location(3, 25),
       VersionSpecificParameters(),
   };
   stream_input->OnFetchOkMessage(ok);
@@ -3194,7 +3200,7 @@
   EXPECT_CALL(mock_stream_,
               Writev(ControlMessageOfType(MoqtMessageType::kFetchError), _));
   MoqtFetch fetch = DefaultFetch();
-  fetch.fetch_id = 3;
+  fetch.request_id = 3;
   stream_input->OnFetchMessage(fetch);
   EXPECT_CALL(
       mock_stream_,
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h
index 6f5b96c..4c81f37 100644
--- a/quiche/quic/moqt/moqt_track.h
+++ b/quiche/quic/moqt/moqt_track.h
@@ -236,7 +236,7 @@
   UpstreamFetch(const MoqtFetch& fetch, const StandaloneFetch standalone,
                 FetchResponseCallback callback)
       : RemoteTrack(
-            standalone.full_track_name, fetch.fetch_id,
+            standalone.full_track_name, fetch.request_id,
             SubscribeWindow(standalone.start_object, standalone.end_group,
                             standalone.end_object),
             fetch.subscriber_priority),
@@ -247,7 +247,7 @@
   // Relative Joining Fetch constructor
   UpstreamFetch(const MoqtFetch& fetch, FullTrackName full_track_name,
                 FetchResponseCallback callback)
-      : RemoteTrack(full_track_name, fetch.fetch_id,
+      : RemoteTrack(full_track_name, fetch.request_id,
                     SubscribeWindow(Location(0, 0)), fetch.subscriber_priority),
         ok_callback_(std::move(callback)) {
     // Immediately set the data stream type.
@@ -258,7 +258,7 @@
                 JoiningFetchAbsolute absolute_joining,
                 FetchResponseCallback callback)
       : RemoteTrack(
-            full_track_name, fetch.fetch_id,
+            full_track_name, fetch.request_id,
             SubscribeWindow(Location(absolute_joining.joining_start, 0)),
             fetch.subscriber_priority),
         ok_callback_(std::move(callback)) {
diff --git a/quiche/quic/moqt/test_tools/moqt_session_peer.h b/quiche/quic/moqt/test_tools/moqt_session_peer.h
index 467c2fb..a0dc737 100644
--- a/quiche/quic/moqt/test_tools/moqt_session_peer.h
+++ b/quiche/quic/moqt/test_tools/moqt_session_peer.h
@@ -199,8 +199,8 @@
     // Initialize the fetch task
     fetch->OnFetchResult(
         Location{4, 10}, absl::OkStatus(),
-        [=, session_ptr = session, fetch_id = fetch_message.fetch_id]() {
-          session_ptr->CancelFetch(fetch_id);
+        [=, session_ptr = session, request_id = fetch_message.request_id]() {
+          session_ptr->CancelFetch(request_id);
         });
     ;
     auto mock_session =
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 1ad79b2..4497674 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -1309,8 +1309,8 @@
   }
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtFetch>(values);
-    if (cast.fetch_id != fetch_.fetch_id) {
-      QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+    if (cast.request_id != fetch_.request_id) {
+      QUIC_LOG(INFO) << "FETCH request_id mismatch";
       return false;
     }
     if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1359,7 +1359,7 @@
  private:
   uint8_t raw_packet_[28] = {
       0x16, 0x00, 0x19,
-      0x01,                          // fetch_id = 1
+      0x01,                          // request_id = 1
       0x02,                          // priority = kHigh
       0x01,                          // group_order = kAscending
       0x01,                          // type = kStandalone
@@ -1371,7 +1371,7 @@
   };
 
   MoqtFetch fetch_ = {
-      /*fetch_id =*/1,
+      /*request_id=*/1,
       /*subscriber_priority=*/2,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*fetch =*/
@@ -1394,8 +1394,8 @@
   }
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtFetch>(values);
-    if (cast.fetch_id != fetch_.fetch_id) {
-      QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+    if (cast.request_id != fetch_.request_id) {
+      QUIC_LOG(INFO) << "FETCH request_id mismatch";
       return false;
     }
     if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1433,7 +1433,7 @@
  private:
   uint8_t raw_packet_[17] = {
       0x16, 0x00, 0x0e,
-      0x01,        // fetch_id = 1
+      0x01,        // request_id = 1
       0x02,        // priority = kHigh
       0x01,        // group_order = kAscending
       0x02,        // type = kRelativeJoining
@@ -1442,7 +1442,7 @@
   };
 
   MoqtFetch fetch_ = {
-      /*fetch_id =*/1,
+      /*request_id =*/1,
       /*subscriber_priority=*/2,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*fetch=*/JoiningFetchRelative{2, 2},
@@ -1459,8 +1459,8 @@
   }
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtFetch>(values);
-    if (cast.fetch_id != fetch_.fetch_id) {
-      QUIC_LOG(INFO) << "FETCH fetch_id mismatch";
+    if (cast.request_id != fetch_.request_id) {
+      QUIC_LOG(INFO) << "FETCH request_id mismatch";
       return false;
     }
     if (cast.subscriber_priority != fetch_.subscriber_priority) {
@@ -1498,7 +1498,7 @@
  private:
   uint8_t raw_packet_[17] = {
       0x16, 0x00, 0x0e,
-      0x01,        // fetch_id = 1
+      0x01,        // request_id = 1
       0x02,        // priority = kHigh
       0x01,        // group_order = kAscending
       0x03,        // type = kAbsoluteJoining
@@ -1507,7 +1507,7 @@
   };
 
   MoqtFetch fetch_ = {
-      /*fetch_id =*/1,
+      /*request_id=*/1,
       /*subscriber_priority=*/2,
       /*group_order=*/MoqtDeliveryOrder::kAscending,
       /*fetch=*/JoiningFetchAbsolute{2, 2},
@@ -1515,6 +1515,105 @@
   };
 };
 
+class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
+ public:
+  FetchOkMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetchOk>(values);
+    if (cast.request_id != fetch_ok_.request_id) {
+      QUIC_LOG(INFO) << "FETCH_OK request_id mismatch";
+      return false;
+    }
+    if (cast.group_order != fetch_ok_.group_order) {
+      QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
+      return false;
+    }
+    if (cast.end_of_track != fetch_ok_.end_of_track) {
+      QUIC_LOG(INFO) << "FETCH_OK end_of_track mismatch";
+      return false;
+    }
+    if (cast.end_location != fetch_ok_.end_location) {
+      QUIC_LOG(INFO) << "FETCH_OK end_location mismatch";
+      return false;
+    }
+    if (cast.parameters != fetch_ok_.parameters) {
+      QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("v--vvvvv---"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_ok_);
+  }
+
+ private:
+  uint8_t raw_packet_[12] = {
+      0x18, 0x00, 0x09,
+      0x01,                    // request_id = 1
+      0x01,                    // group_order = kAscending
+      0x00,                    // end_of_track = false
+      0x05, 0x04,              // end_location = 5, 4
+      0x01, 0x04, 0x67, 0x10,  // MaxCacheDuration = 10000
+  };
+
+  MoqtFetchOk fetch_ok_ = {
+      /*request_id =*/1,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*end_of_track=*/false,
+      /*end_location=*/Location{5, 4},
+      VersionSpecificParameters(quic::QuicTimeDelta::Infinite(),
+                                quic::QuicTimeDelta::FromMilliseconds(10000)),
+  };
+};
+
+class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
+ public:
+  FetchErrorMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetchError>(values);
+    if (cast.request_id != fetch_error_.request_id) {
+      QUIC_LOG(INFO) << "FETCH_ERROR request_id mismatch";
+      return false;
+    }
+    if (cast.error_code != fetch_error_.error_code) {
+      QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
+      return false;
+    }
+    if (cast.error_reason != fetch_error_.error_reason) {
+      QUIC_LOG(INFO) << "FETCH_ERROR error_reason mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv---"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_error_);
+  }
+
+ private:
+  uint8_t raw_packet_[9] = {
+      0x19, 0x00, 0x06,
+      0x01,                    // request_id = 1
+      0x01,                    // error_code = kUnauthorized
+      0x03, 0x62, 0x61, 0x72,  // error_reason = "bar"
+  };
+
+  MoqtFetchError fetch_error_ = {
+      /*request_id =*/1,
+      /*error_code=*/RequestErrorCode::kUnauthorized,
+      /*error_reason=*/"bar",
+  };
+};
+
 class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase {
  public:
   FetchCancelMessage() : TestMessageBase() {
@@ -1522,7 +1621,7 @@
   }
   bool EqualFieldValues(MessageStructuredData& values) const override {
     auto cast = std::get<MoqtFetchCancel>(values);
-    if (cast.subscribe_id != fetch_cancel_.subscribe_id) {
+    if (cast.request_id != fetch_cancel_.request_id) {
       QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch";
       return false;
     }
@@ -1538,104 +1637,11 @@
  private:
   uint8_t raw_packet_[4] = {
       0x17, 0x00, 0x01,
-      0x01,  // subscribe_id = 1
+      0x01,  // request_id = 1
   };
 
   MoqtFetchCancel fetch_cancel_ = {
-      /*subscribe_id =*/1,
-  };
-};
-
-class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
- public:
-  FetchOkMessage() : TestMessageBase() {
-    SetWireImage(raw_packet_, sizeof(raw_packet_));
-  }
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtFetchOk>(values);
-    if (cast.subscribe_id != fetch_ok_.subscribe_id) {
-      QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch";
-      return false;
-    }
-    if (cast.group_order != fetch_ok_.group_order) {
-      QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
-      return false;
-    }
-    if (cast.largest_id != fetch_ok_.largest_id) {
-      QUIC_LOG(INFO) << "FETCH_OK start_object mismatch";
-      return false;
-    }
-    if (cast.parameters != fetch_ok_.parameters) {
-      QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
-      return false;
-    }
-    return true;
-  }
-
-  void ExpandVarints() override { ExpandVarintsImpl("v-vvvvv---"); }
-
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(fetch_ok_);
-  }
-
- private:
-  uint8_t raw_packet_[11] = {
-      0x18, 0x00, 0x08,
-      0x01,                    // subscribe_id = 1
-      0x01,                    // group_order = kAscending
-      0x05, 0x04,              // largest_object = 5, 4
-      0x01, 0x04, 0x67, 0x10,  // MaxCacheDuration = 10000
-  };
-
-  MoqtFetchOk fetch_ok_ = {
-      /*subscribe_id =*/1,
-      /*group_order=*/MoqtDeliveryOrder::kAscending,
-      /*start_object=*/Location{5, 4},
-      VersionSpecificParameters(quic::QuicTimeDelta::Infinite(),
-                                quic::QuicTimeDelta::FromMilliseconds(10000)),
-  };
-};
-
-class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
- public:
-  FetchErrorMessage() : TestMessageBase() {
-    SetWireImage(raw_packet_, sizeof(raw_packet_));
-  }
-  bool EqualFieldValues(MessageStructuredData& values) const override {
-    auto cast = std::get<MoqtFetchError>(values);
-    if (cast.subscribe_id != fetch_error_.subscribe_id) {
-      QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch";
-      return false;
-    }
-    if (cast.error_code != fetch_error_.error_code) {
-      QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
-      return false;
-    }
-    if (cast.reason_phrase != fetch_error_.reason_phrase) {
-      QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch";
-      return false;
-    }
-    return true;
-  }
-
-  void ExpandVarints() override { ExpandVarintsImpl("vvv---"); }
-
-  MessageStructuredData structured_data() const override {
-    return TestMessageBase::MessageStructuredData(fetch_error_);
-  }
-
- private:
-  uint8_t raw_packet_[9] = {
-      0x19, 0x00, 0x06,
-      0x01,                    // subscribe_id = 1
-      0x01,                    // error_code = kUnauthorized
-      0x03, 0x62, 0x61, 0x72,  // reason_phrase = "bar"
-  };
-
-  MoqtFetchError fetch_error_ = {
-      /*subscribe_id =*/1,
-      /*error_code=*/RequestErrorCode::kUnauthorized,
-      /*reason_phrase=*/"bar",
+      /*request_id =*/1,
   };
 };