Framer and Parser for MoQT FETCH family messages in draft-07.

PiperOrigin-RevId: 689397465
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index d2d34f8..f15f380 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -649,6 +649,50 @@
                                  WireVarInt62(message.max_subscribe_id));
 }
 
+quiche::QuicheBuffer MoqtFramer::SerializeFetch(const MoqtFetch& message) {
+  if (message.end_group < message.start_object.group ||
+      (message.end_group == message.start_object.group &&
+       message.end_object.has_value() &&
+       *message.end_object < message.start_object.object)) {
+    QUICHE_BUG(MoqtFramer_invalid_fetch) << "Invalid FETCH object range";
+    return quiche::QuicheBuffer();
+  }
+  return SerializeControlMessage(
+      MoqtMessageType::kFetch, WireVarInt62(message.subscribe_id),
+      WireFullTrackName(message.full_track_name, true),
+      WireUint8(message.subscriber_priority),
+      WireDeliveryOrder(message.group_order),
+      WireVarInt62(message.start_object.group),
+      WireVarInt62(message.start_object.object),
+      WireVarInt62(message.end_group),
+      WireVarInt62(message.end_object.has_value() ? *message.end_object + 1
+                                                  : 0),
+      WireSubscribeParameterList(message.parameters));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
+    const MoqtFetchCancel& message) {
+  return SerializeControlMessage(MoqtMessageType::kFetchCancel,
+                                 WireVarInt62(message.subscribe_id));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) {
+  return SerializeControlMessage(
+      MoqtMessageType::kFetchOk, WireVarInt62(message.subscribe_id),
+      WireDeliveryOrder(message.group_order),
+      WireVarInt62(message.largest_id.group),
+      WireVarInt62(message.largest_id.object),
+      WireSubscribeParameterList(message.parameters));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
+    const MoqtFetchError& message) {
+  return SerializeControlMessage(
+      MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id),
+      WireVarInt62(message.error_code),
+      WireStringWithVarInt62Length(message.reason_phrase));
+}
+
 quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
     const MoqtObjectAck& message) {
   return SerializeControlMessage(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 6bf9454..ae931a9 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -65,6 +65,10 @@
       const MoqtUnsubscribeAnnounces& message);
   quiche::QuicheBuffer SerializeMaxSubscribeId(
       const MoqtMaxSubscribeId& message);
+  quiche::QuicheBuffer SerializeFetch(const MoqtFetch& message);
+  quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message);
+  quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message);
+  quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message);
   quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
 
  private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index e8a8cce..e73142f 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -14,6 +14,7 @@
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
 #include "quiche/quic/moqt/test_tools/moqt_test_message.h"
 #include "quiche/quic/platform/api/quic_expect_bug.h"
 #include "quiche/quic/platform/api/quic_test.h"
@@ -51,6 +52,10 @@
       MoqtMessageType::kSubscribeAnnouncesError,
       MoqtMessageType::kUnsubscribeAnnounces,
       MoqtMessageType::kMaxSubscribeId,
+      MoqtMessageType::kFetch,
+      MoqtMessageType::kFetchCancel,
+      MoqtMessageType::kFetchOk,
+      MoqtMessageType::kFetchError,
       MoqtMessageType::kObjectAck,
       MoqtMessageType::kClientSetup,
       MoqtMessageType::kServerSetup,
@@ -182,6 +187,22 @@
         auto data = std::get<MoqtMaxSubscribeId>(structured_data);
         return framer_.SerializeMaxSubscribeId(data);
       }
+      case moqt::MoqtMessageType::kFetch: {
+        auto data = std::get<MoqtFetch>(structured_data);
+        return framer_.SerializeFetch(data);
+      }
+      case moqt::MoqtMessageType::kFetchCancel: {
+        auto data = std::get<MoqtFetchCancel>(structured_data);
+        return framer_.SerializeFetchCancel(data);
+      }
+      case moqt::MoqtMessageType::kFetchOk: {
+        auto data = std::get<MoqtFetchOk>(structured_data);
+        return framer_.SerializeFetchOk(data);
+      }
+      case moqt::MoqtMessageType::kFetchError: {
+        auto data = std::get<MoqtFetchError>(structured_data);
+        return framer_.SerializeFetchError(data);
+      }
       case moqt::MoqtMessageType::kObjectAck: {
         auto data = std::get<MoqtObjectAck>(structured_data);
         return framer_.SerializeObjectAck(data);
@@ -447,6 +468,29 @@
   EXPECT_EQ(buffer.size(), 0);
 }
 
+TEST_F(MoqtFramerSimpleTest, FetchEndBeforeStart) {
+  MoqtFetch fetch = {
+      /*subscribe_id =*/1,
+      /*full_track_name=*/FullTrackName{"foo", "bar"},
+      /*subscriber_priority=*/2,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*start_object=*/FullSequence{1, 2},
+      /*end_group=*/1,
+      /*end_object=*/1,
+      /*parameters=*/
+      MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+  };
+  quiche::QuicheBuffer buffer;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch),
+                  "Invalid FETCH object range");
+  EXPECT_EQ(buffer.size(), 0);
+  fetch.end_group = 0;
+  fetch.end_object = std::nullopt;
+  EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch),
+                  "Invalid FETCH object range");
+  EXPECT_EQ(buffer.size(), 0);
+}
+
 TEST_F(MoqtFramerSimpleTest, SubscribeLatestGroupNonzeroObject) {
   MoqtSubscribe subscribe = {
       /*subscribe_id=*/3,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index a406a70..f9600de 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -107,6 +107,14 @@
       return "UNSUBSCRIBE_NAMESPACE";
     case MoqtMessageType::kMaxSubscribeId:
       return "MAX_SUBSCRIBE_ID";
+    case MoqtMessageType::kFetch:
+      return "FETCH";
+    case MoqtMessageType::kFetchCancel:
+      return "FETCH_CANCEL";
+    case MoqtMessageType::kFetchOk:
+      return "FETCH_OK";
+    case MoqtMessageType::kFetchError:
+      return "FETCH_ERROR";
     case MoqtMessageType::kObjectAck:
       return "OBJECT_ACK";
   }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 83502bc..eaca3d6 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -93,6 +93,10 @@
   kSubscribeAnnouncesError = 0x13,
   kUnsubscribeAnnounces = 0x14,
   kMaxSubscribeId = 0x15,
+  kFetch = 0x16,
+  kFetchCancel = 0x17,
+  kFetchOk = 0x18,
+  kFetchError = 0x19,
   kClientSetup = 0x40,
   kServerSetup = 0x41,
 
@@ -528,6 +532,34 @@
   uint64_t max_subscribe_id;
 };
 
+struct QUICHE_EXPORT MoqtFetch {
+  uint64_t subscribe_id;
+  FullTrackName full_track_name;
+  MoqtPriority subscriber_priority;
+  std::optional<MoqtDeliveryOrder> group_order;
+  FullSequence start_object;  // subgroup is ignored
+  uint64_t end_group;
+  std::optional<uint64_t> end_object;
+  MoqtSubscribeParameters parameters;
+};
+
+struct QUICHE_EXPORT MoqtFetchCancel {
+  uint64_t subscribe_id;
+};
+
+struct QUICHE_EXPORT MoqtFetchOk {
+  uint64_t subscribe_id;
+  MoqtDeliveryOrder group_order;
+  FullSequence largest_id;  // subgroup is ignored
+  MoqtSubscribeParameters parameters;
+};
+
+struct QUICHE_EXPORT MoqtFetchError {
+  uint64_t subscribe_id;
+  SubscribeErrorCode error_code;
+  std::string reason_phrase;
+};
+
 // All of the four values in this message are encoded as varints.
 // `delta_from_deadline` is encoded as an absolute value, with the lowest bit
 // indicating the sign (0 if positive).
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index aa50535..1af9f74 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -264,6 +264,18 @@
     case MoqtMessageType::kMaxSubscribeId:
       bytes_read = ProcessMaxSubscribeId(reader);
       break;
+    case MoqtMessageType::kFetch:
+      bytes_read = ProcessFetch(reader);
+      break;
+    case MoqtMessageType::kFetchCancel:
+      bytes_read = ProcessFetchCancel(reader);
+      break;
+    case MoqtMessageType::kFetchOk:
+      bytes_read = ProcessFetchOk(reader);
+      break;
+    case MoqtMessageType::kFetchError:
+      bytes_read = ProcessFetchError(reader);
+      break;
     case moqt::MoqtMessageType::kObjectAck:
       bytes_read = ProcessObjectAck(reader);
       break;
@@ -803,6 +815,83 @@
   return reader.PreviouslyReadPayload().length();
 }
 
+size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) {
+  MoqtFetch fetch;
+  absl::string_view track_name;
+  uint8_t group_order;
+  uint64_t end_object;
+  if (!reader.ReadVarInt62(&fetch.subscribe_id) ||
+      !ReadTrackNamespace(reader, fetch.full_track_name) ||
+      !reader.ReadStringPieceVarInt62(&track_name) ||
+      !reader.ReadUInt8(&fetch.subscriber_priority) ||
+      !reader.ReadUInt8(&group_order) ||
+      !reader.ReadVarInt62(&fetch.start_object.group) ||
+      !reader.ReadVarInt62(&fetch.start_object.object) ||
+      !reader.ReadVarInt62(&fetch.end_group) ||
+      !reader.ReadVarInt62(&end_object) ||
+      !ReadSubscribeParameters(reader, fetch.parameters)) {
+    return 0;
+  }
+  // Elements that have to be translated from the literal value.
+  fetch.full_track_name.AddElement(track_name);
+  if (!ParseDeliveryOrder(group_order, fetch.group_order)) {
+    ParseError("Invalid group order value in FETCH message");
+    return 0;
+  }
+  fetch.end_object =
+      end_object == 0 ? std::optional<uint64_t>() : (end_object - 1);
+  if (fetch.end_group < fetch.start_object.group ||
+      (fetch.end_group == fetch.start_object.group &&
+       fetch.end_object.has_value() &&
+       *fetch.end_object < fetch.start_object.object)) {
+    ParseError("End object comes before start object in FETCH");
+    return 0;
+  }
+  visitor_.OnFetchMessage(fetch);
+  return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
+  MoqtFetchCancel fetch_cancel;
+  if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
+    return 0;
+  }
+  visitor_.OnFetchCancelMessage(fetch_cancel);
+  return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
+  MoqtFetchOk fetch_ok;
+  uint8_t group_order;
+  if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
+      !reader.ReadUInt8(&group_order) ||
+      !reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
+      !reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
+      !ReadSubscribeParameters(reader, fetch_ok.parameters)) {
+    return 0;
+  }
+  if (group_order != 0x01 && group_order != 0x02) {
+    ParseError("Invalid group order value in FETCH_OK");
+    return 0;
+  }
+  fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
+  visitor_.OnFetchOkMessage(fetch_ok);
+  return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
+  MoqtFetchError fetch_error;
+  uint64_t error_code;
+  if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
+      !reader.ReadVarInt62(&error_code) ||
+      !reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
+    return 0;
+  }
+  fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code);
+  visitor_.OnFetchErrorMessage(fetch_error);
+  return reader.PreviouslyReadPayload().length();
+}
+
 size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
   MoqtObjectAck object_ack;
   uint64_t raw_delta;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 4ebaf85..58795c4 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -52,6 +52,10 @@
   virtual void OnUnsubscribeAnnouncesMessage(
       const MoqtUnsubscribeAnnounces& message) = 0;
   virtual void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) = 0;
+  virtual void OnFetchMessage(const MoqtFetch& message) = 0;
+  virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
+  virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
+  virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
   virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
 
   virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
@@ -121,6 +125,10 @@
   size_t ProcessSubscribeAnnouncesError(quic::QuicDataReader& reader);
   size_t ProcessUnsubscribeAnnounces(quic::QuicDataReader& reader);
   size_t ProcessMaxSubscribeId(quic::QuicDataReader& reader);
+  size_t ProcessFetch(quic::QuicDataReader& reader);
+  size_t ProcessFetchCancel(quic::QuicDataReader& reader);
+  size_t ProcessFetchOk(quic::QuicDataReader& reader);
+  size_t ProcessFetchError(quic::QuicDataReader& reader);
   size_t ProcessObjectAck(quic::QuicDataReader& reader);
 
   // If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 84644bc..fb9ed3c 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -52,6 +52,10 @@
     MoqtMessageType::kSubscribeAnnouncesError,
     MoqtMessageType::kUnsubscribeAnnounces,
     MoqtMessageType::kMaxSubscribeId,
+    MoqtMessageType::kFetch,
+    MoqtMessageType::kFetchCancel,
+    MoqtMessageType::kFetchOk,
+    MoqtMessageType::kFetchError,
     MoqtMessageType::kObjectAck,
 };
 constexpr std::array kDataStreamTypes{
@@ -194,6 +198,18 @@
   void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override {
     OnControlMessage(message);
   }
+  void OnFetchMessage(const MoqtFetch& message) override {
+    OnControlMessage(message);
+  }
+  void OnFetchCancelMessage(const MoqtFetchCancel& message) override {
+    OnControlMessage(message);
+  }
+  void OnFetchOkMessage(const MoqtFetchOk& message) override {
+    OnControlMessage(message);
+  }
+  void OnFetchErrorMessage(const MoqtFetchError& message) override {
+    OnControlMessage(message);
+  }
   void OnObjectAckMessage(const MoqtObjectAck& message) override {
     OnControlMessage(message);
   }
@@ -1310,6 +1326,39 @@
             "SUBSCRIBE_DONE ContentExists has invalid value");
 }
 
+TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) {
+  MoqtControlParser parser(kRawQuic, visitor_);
+  FetchMessage fetch;
+  fetch.SetEndObject(1, 1);
+  parser.ProcessData(fetch.PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_TRUE(visitor_.parsing_error_.has_value());
+  EXPECT_EQ(*visitor_.parsing_error_,
+            "End object comes before start object in FETCH");
+}
+
+TEST_F(MoqtMessageSpecificTest, FetchInvalidRange2) {
+  MoqtControlParser parser(kRawQuic, visitor_);
+  FetchMessage fetch;
+  fetch.SetEndObject(0, std::nullopt);
+  parser.ProcessData(fetch.PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_TRUE(visitor_.parsing_error_.has_value());
+  EXPECT_EQ(*visitor_.parsing_error_,
+            "End object comes before start object in FETCH");
+}
+
+TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) {
+  MoqtControlParser parser(kRawQuic, visitor_);
+  FetchMessage fetch;
+  fetch.SetGroupOrder(3);
+  parser.ProcessData(fetch.PacketSample(), false);
+  EXPECT_EQ(visitor_.messages_received_, 0);
+  EXPECT_TRUE(visitor_.parsing_error_.has_value());
+  EXPECT_EQ(*visitor_.parsing_error_,
+            "Invalid group order value in FETCH message");
+}
+
 TEST_F(MoqtMessageSpecificTest, PaddingStream) {
   MoqtDataParser parser(&visitor_);
   std::string buffer(32, '\0');
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index eeb9324..34e0aaf 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -213,6 +213,10 @@
     void OnUnsubscribeAnnouncesMessage(
         const MoqtUnsubscribeAnnounces& message) override {}
     void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override;
+    void OnFetchMessage(const MoqtFetch& message) override {}
+    void OnFetchCancelMessage(const MoqtFetchCancel& message) override {}
+    void OnFetchOkMessage(const MoqtFetchOk& message) override {}
+    void OnFetchErrorMessage(const MoqtFetchError& message) override {}
     void OnObjectAckMessage(const MoqtObjectAck& message) override {
       auto subscription_it =
           session_->published_subscriptions_.find(message.subscribe_id);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 23d34a9..d0318bd 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -39,7 +39,8 @@
       MoqtAnnounceCancel, MoqtTrackStatusRequest, MoqtUnannounce,
       MoqtTrackStatus, MoqtGoAway, MoqtSubscribeAnnounces,
       MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError,
-      MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtObjectAck>;
+      MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel,
+      MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
 
   // The total actual size of the message.
   size_t total_message_size() const { return wire_image_size_; }
@@ -1293,6 +1294,222 @@
   };
 };
 
+class QUICHE_NO_EXPORT FetchMessage : public TestMessageBase {
+ public:
+  FetchMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetch>(values);
+    if (cast.subscribe_id != fetch_.subscribe_id) {
+      QUIC_LOG(INFO) << "FETCH subscribe_id mismatch";
+      return false;
+    }
+    if (cast.full_track_name != fetch_.full_track_name) {
+      QUIC_LOG(INFO) << "FETCH full_track_name mismatch";
+      return false;
+    }
+    if (cast.subscriber_priority != fetch_.subscriber_priority) {
+      QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch";
+      return false;
+    }
+    if (cast.group_order != fetch_.group_order) {
+      QUIC_LOG(INFO) << "FETCH group_order mismatch";
+      return false;
+    }
+    if (cast.start_object != fetch_.start_object) {
+      QUIC_LOG(INFO) << "FETCH start_object mismatch";
+      return false;
+    }
+    if (cast.end_group != fetch_.end_group) {
+      QUIC_LOG(INFO) << "FETCH end_group mismatch";
+      return false;
+    }
+    if (cast.end_object != fetch_.end_object) {
+      QUIC_LOG(INFO) << "FETCH end_object mismatch";
+      return false;
+    }
+    if (cast.parameters != fetch_.parameters) {
+      QUIC_LOG(INFO) << "FETCH parameters mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override {
+    ExpandVarintsImpl("vvvv---v-----vvvvvvv---");
+  }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_);
+  }
+
+  void SetEndObject(uint64_t group, std::optional<uint64_t> object) {
+    // Avoid varint nonsense.
+    QUICHE_CHECK(group < 64);
+    QUICHE_CHECK(!object.has_value() || *object < 64);
+    fetch_.end_group = group;
+    fetch_.end_object = object;
+    raw_packet_[16] = group;
+    raw_packet_[17] = object.has_value() ? (*object + 1) : 0;
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+
+  void SetGroupOrder(uint8_t group_order) {
+    raw_packet_[13] = static_cast<uint8_t>(group_order);
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+
+ private:
+  uint8_t raw_packet_[24] = {
+      0x16, 0x16,
+      0x01,                                // subscribe_id = 1
+      0x01, 0x03, 0x66, 0x6f, 0x6f,        // track_namespace = "foo"
+      0x03, 0x62, 0x61, 0x72,              // track_name = "bar"
+      0x02,                                // priority = kHigh
+      0x01,                                // group_order = kAscending
+      0x01, 0x02,                          // start_object = 1, 2
+      0x05, 0x07,                          // end_object = 5, 6
+      0x01, 0x02, 0x03, 0x62, 0x61, 0x7a,  // parameters = "baz"
+  };
+
+  MoqtFetch fetch_ = {
+      /*subscribe_id =*/1,
+      /*full_track_name=*/FullTrackName{"foo", "bar"},
+      /*subscriber_priority=*/2,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*start_object=*/FullSequence{1, 2},
+      /*end_group=*/5,
+      /*end_object=*/6,
+      /*parameters=*/
+      MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+  };
+};
+
+class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase {
+ public:
+  FetchCancelMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetchCancel>(values);
+    if (cast.subscribe_id != fetch_cancel_.subscribe_id) {
+      QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_cancel_);
+  }
+
+ private:
+  uint8_t raw_packet_[3] = {
+      0x17, 0x01,
+      0x01,  // subscribe_id = 1
+  };
+
+  MoqtFetchCancel fetch_cancel_ = {
+      /*subscribe_id =*/1,
+  };
+};
+
+class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
+ public:
+  FetchOkMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetchOk>(values);
+    if (cast.subscribe_id != fetch_ok_.subscribe_id) {
+      QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch";
+      return false;
+    }
+    if (cast.group_order != fetch_ok_.group_order) {
+      QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
+      return false;
+    }
+    if (cast.largest_id != fetch_ok_.largest_id) {
+      QUIC_LOG(INFO) << "FETCH_OK start_object mismatch";
+      return false;
+    }
+    if (cast.parameters != fetch_ok_.parameters) {
+      QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv-vvvvv---"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_ok_);
+  }
+
+ private:
+  uint8_t raw_packet_[12] = {
+      0x18, 0x0a,
+      0x01,                                // subscribe_id = 1
+      0x01,                                // group_order = kAscending
+      0x05, 0x04,                          // largest_object = 5, 4
+      0x01, 0x02, 0x03, 0x62, 0x61, 0x7a,  // parameters = "baz"
+  };
+
+  MoqtFetchOk fetch_ok_ = {
+      /*subscribe_id =*/1,
+      /*group_order=*/MoqtDeliveryOrder::kAscending,
+      /*start_object=*/FullSequence{5, 4},
+      /*parameters=*/
+      MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+  };
+};
+
+class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
+ public:
+  FetchErrorMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtFetchError>(values);
+    if (cast.subscribe_id != fetch_error_.subscribe_id) {
+      QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch";
+      return false;
+    }
+    if (cast.error_code != fetch_error_.error_code) {
+      QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
+      return false;
+    }
+    if (cast.reason_phrase != fetch_error_.reason_phrase) {
+      QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvvvv---"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(fetch_error_);
+  }
+
+ private:
+  uint8_t raw_packet_[8] = {
+      0x19, 0x06,
+      0x01,                    // subscribe_id = 1
+      0x04,                    // error_code = kUnauthorized
+      0x03, 0x62, 0x61, 0x72,  // reason_phrase = "bar"
+  };
+
+  MoqtFetchError fetch_error_ = {
+      /*subscribe_id =*/1,
+      /*error_code=*/SubscribeErrorCode::kUnauthorized,
+      /*reason_phrase=*/"bar",
+  };
+};
+
 class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase {
  public:
   ObjectAckMessage() : TestMessageBase() {
@@ -1383,6 +1600,14 @@
       return std::make_unique<UnsubscribeAnnouncesMessage>();
     case MoqtMessageType::kMaxSubscribeId:
       return std::make_unique<MaxSubscribeIdMessage>();
+    case MoqtMessageType::kFetch:
+      return std::make_unique<FetchMessage>();
+    case MoqtMessageType::kFetchCancel:
+      return std::make_unique<FetchCancelMessage>();
+    case MoqtMessageType::kFetchOk:
+      return std::make_unique<FetchOkMessage>();
+    case MoqtMessageType::kFetchError:
+      return std::make_unique<FetchErrorMessage>();
     case MoqtMessageType::kObjectAck:
       return std::make_unique<ObjectAckMessage>();
     case MoqtMessageType::kClientSetup: