Framer and Parser for MoQT FETCH family messages in draft-07.
PiperOrigin-RevId: 689397465
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index d2d34f8..f15f380 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -649,6 +649,50 @@
WireVarInt62(message.max_subscribe_id));
}
+quiche::QuicheBuffer MoqtFramer::SerializeFetch(const MoqtFetch& message) {
+ if (message.end_group < message.start_object.group ||
+ (message.end_group == message.start_object.group &&
+ message.end_object.has_value() &&
+ *message.end_object < message.start_object.object)) {
+ QUICHE_BUG(MoqtFramer_invalid_fetch) << "Invalid FETCH object range";
+ return quiche::QuicheBuffer();
+ }
+ return SerializeControlMessage(
+ MoqtMessageType::kFetch, WireVarInt62(message.subscribe_id),
+ WireFullTrackName(message.full_track_name, true),
+ WireUint8(message.subscriber_priority),
+ WireDeliveryOrder(message.group_order),
+ WireVarInt62(message.start_object.group),
+ WireVarInt62(message.start_object.object),
+ WireVarInt62(message.end_group),
+ WireVarInt62(message.end_object.has_value() ? *message.end_object + 1
+ : 0),
+ WireSubscribeParameterList(message.parameters));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchCancel(
+ const MoqtFetchCancel& message) {
+ return SerializeControlMessage(MoqtMessageType::kFetchCancel,
+ WireVarInt62(message.subscribe_id));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchOk(const MoqtFetchOk& message) {
+ return SerializeControlMessage(
+ MoqtMessageType::kFetchOk, WireVarInt62(message.subscribe_id),
+ WireDeliveryOrder(message.group_order),
+ WireVarInt62(message.largest_id.group),
+ WireVarInt62(message.largest_id.object),
+ WireSubscribeParameterList(message.parameters));
+}
+
+quiche::QuicheBuffer MoqtFramer::SerializeFetchError(
+ const MoqtFetchError& message) {
+ return SerializeControlMessage(
+ MoqtMessageType::kFetchError, WireVarInt62(message.subscribe_id),
+ WireVarInt62(message.error_code),
+ WireStringWithVarInt62Length(message.reason_phrase));
+}
+
quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
const MoqtObjectAck& message) {
return SerializeControlMessage(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 6bf9454..ae931a9 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -65,6 +65,10 @@
const MoqtUnsubscribeAnnounces& message);
quiche::QuicheBuffer SerializeMaxSubscribeId(
const MoqtMaxSubscribeId& message);
+ quiche::QuicheBuffer SerializeFetch(const MoqtFetch& message);
+ quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message);
+ quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message);
+ quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message);
quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index e8a8cce..e73142f 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -14,6 +14,7 @@
#include "absl/strings/str_cat.h"
#include "absl/strings/string_view.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/test_tools/moqt_test_message.h"
#include "quiche/quic/platform/api/quic_expect_bug.h"
#include "quiche/quic/platform/api/quic_test.h"
@@ -51,6 +52,10 @@
MoqtMessageType::kSubscribeAnnouncesError,
MoqtMessageType::kUnsubscribeAnnounces,
MoqtMessageType::kMaxSubscribeId,
+ MoqtMessageType::kFetch,
+ MoqtMessageType::kFetchCancel,
+ MoqtMessageType::kFetchOk,
+ MoqtMessageType::kFetchError,
MoqtMessageType::kObjectAck,
MoqtMessageType::kClientSetup,
MoqtMessageType::kServerSetup,
@@ -182,6 +187,22 @@
auto data = std::get<MoqtMaxSubscribeId>(structured_data);
return framer_.SerializeMaxSubscribeId(data);
}
+ case moqt::MoqtMessageType::kFetch: {
+ auto data = std::get<MoqtFetch>(structured_data);
+ return framer_.SerializeFetch(data);
+ }
+ case moqt::MoqtMessageType::kFetchCancel: {
+ auto data = std::get<MoqtFetchCancel>(structured_data);
+ return framer_.SerializeFetchCancel(data);
+ }
+ case moqt::MoqtMessageType::kFetchOk: {
+ auto data = std::get<MoqtFetchOk>(structured_data);
+ return framer_.SerializeFetchOk(data);
+ }
+ case moqt::MoqtMessageType::kFetchError: {
+ auto data = std::get<MoqtFetchError>(structured_data);
+ return framer_.SerializeFetchError(data);
+ }
case moqt::MoqtMessageType::kObjectAck: {
auto data = std::get<MoqtObjectAck>(structured_data);
return framer_.SerializeObjectAck(data);
@@ -447,6 +468,29 @@
EXPECT_EQ(buffer.size(), 0);
}
+TEST_F(MoqtFramerSimpleTest, FetchEndBeforeStart) {
+ MoqtFetch fetch = {
+ /*subscribe_id =*/1,
+ /*full_track_name=*/FullTrackName{"foo", "bar"},
+ /*subscriber_priority=*/2,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*start_object=*/FullSequence{1, 2},
+ /*end_group=*/1,
+ /*end_object=*/1,
+ /*parameters=*/
+ MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ };
+ quiche::QuicheBuffer buffer;
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch),
+ "Invalid FETCH object range");
+ EXPECT_EQ(buffer.size(), 0);
+ fetch.end_group = 0;
+ fetch.end_object = std::nullopt;
+ EXPECT_QUIC_BUG(buffer = framer_.SerializeFetch(fetch),
+ "Invalid FETCH object range");
+ EXPECT_EQ(buffer.size(), 0);
+}
+
TEST_F(MoqtFramerSimpleTest, SubscribeLatestGroupNonzeroObject) {
MoqtSubscribe subscribe = {
/*subscribe_id=*/3,
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index a406a70..f9600de 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -107,6 +107,14 @@
return "UNSUBSCRIBE_NAMESPACE";
case MoqtMessageType::kMaxSubscribeId:
return "MAX_SUBSCRIBE_ID";
+ case MoqtMessageType::kFetch:
+ return "FETCH";
+ case MoqtMessageType::kFetchCancel:
+ return "FETCH_CANCEL";
+ case MoqtMessageType::kFetchOk:
+ return "FETCH_OK";
+ case MoqtMessageType::kFetchError:
+ return "FETCH_ERROR";
case MoqtMessageType::kObjectAck:
return "OBJECT_ACK";
}
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 83502bc..eaca3d6 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -93,6 +93,10 @@
kSubscribeAnnouncesError = 0x13,
kUnsubscribeAnnounces = 0x14,
kMaxSubscribeId = 0x15,
+ kFetch = 0x16,
+ kFetchCancel = 0x17,
+ kFetchOk = 0x18,
+ kFetchError = 0x19,
kClientSetup = 0x40,
kServerSetup = 0x41,
@@ -528,6 +532,34 @@
uint64_t max_subscribe_id;
};
+struct QUICHE_EXPORT MoqtFetch {
+ uint64_t subscribe_id;
+ FullTrackName full_track_name;
+ MoqtPriority subscriber_priority;
+ std::optional<MoqtDeliveryOrder> group_order;
+ FullSequence start_object; // subgroup is ignored
+ uint64_t end_group;
+ std::optional<uint64_t> end_object;
+ MoqtSubscribeParameters parameters;
+};
+
+struct QUICHE_EXPORT MoqtFetchCancel {
+ uint64_t subscribe_id;
+};
+
+struct QUICHE_EXPORT MoqtFetchOk {
+ uint64_t subscribe_id;
+ MoqtDeliveryOrder group_order;
+ FullSequence largest_id; // subgroup is ignored
+ MoqtSubscribeParameters parameters;
+};
+
+struct QUICHE_EXPORT MoqtFetchError {
+ uint64_t subscribe_id;
+ SubscribeErrorCode error_code;
+ std::string reason_phrase;
+};
+
// All of the four values in this message are encoded as varints.
// `delta_from_deadline` is encoded as an absolute value, with the lowest bit
// indicating the sign (0 if positive).
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index aa50535..1af9f74 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -264,6 +264,18 @@
case MoqtMessageType::kMaxSubscribeId:
bytes_read = ProcessMaxSubscribeId(reader);
break;
+ case MoqtMessageType::kFetch:
+ bytes_read = ProcessFetch(reader);
+ break;
+ case MoqtMessageType::kFetchCancel:
+ bytes_read = ProcessFetchCancel(reader);
+ break;
+ case MoqtMessageType::kFetchOk:
+ bytes_read = ProcessFetchOk(reader);
+ break;
+ case MoqtMessageType::kFetchError:
+ bytes_read = ProcessFetchError(reader);
+ break;
case moqt::MoqtMessageType::kObjectAck:
bytes_read = ProcessObjectAck(reader);
break;
@@ -803,6 +815,83 @@
return reader.PreviouslyReadPayload().length();
}
+size_t MoqtControlParser::ProcessFetch(quic::QuicDataReader& reader) {
+ MoqtFetch fetch;
+ absl::string_view track_name;
+ uint8_t group_order;
+ uint64_t end_object;
+ if (!reader.ReadVarInt62(&fetch.subscribe_id) ||
+ !ReadTrackNamespace(reader, fetch.full_track_name) ||
+ !reader.ReadStringPieceVarInt62(&track_name) ||
+ !reader.ReadUInt8(&fetch.subscriber_priority) ||
+ !reader.ReadUInt8(&group_order) ||
+ !reader.ReadVarInt62(&fetch.start_object.group) ||
+ !reader.ReadVarInt62(&fetch.start_object.object) ||
+ !reader.ReadVarInt62(&fetch.end_group) ||
+ !reader.ReadVarInt62(&end_object) ||
+ !ReadSubscribeParameters(reader, fetch.parameters)) {
+ return 0;
+ }
+ // Elements that have to be translated from the literal value.
+ fetch.full_track_name.AddElement(track_name);
+ if (!ParseDeliveryOrder(group_order, fetch.group_order)) {
+ ParseError("Invalid group order value in FETCH message");
+ return 0;
+ }
+ fetch.end_object =
+ end_object == 0 ? std::optional<uint64_t>() : (end_object - 1);
+ if (fetch.end_group < fetch.start_object.group ||
+ (fetch.end_group == fetch.start_object.group &&
+ fetch.end_object.has_value() &&
+ *fetch.end_object < fetch.start_object.object)) {
+ ParseError("End object comes before start object in FETCH");
+ return 0;
+ }
+ visitor_.OnFetchMessage(fetch);
+ return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchCancel(quic::QuicDataReader& reader) {
+ MoqtFetchCancel fetch_cancel;
+ if (!reader.ReadVarInt62(&fetch_cancel.subscribe_id)) {
+ return 0;
+ }
+ visitor_.OnFetchCancelMessage(fetch_cancel);
+ return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchOk(quic::QuicDataReader& reader) {
+ MoqtFetchOk fetch_ok;
+ uint8_t group_order;
+ if (!reader.ReadVarInt62(&fetch_ok.subscribe_id) ||
+ !reader.ReadUInt8(&group_order) ||
+ !reader.ReadVarInt62(&fetch_ok.largest_id.group) ||
+ !reader.ReadVarInt62(&fetch_ok.largest_id.object) ||
+ !ReadSubscribeParameters(reader, fetch_ok.parameters)) {
+ return 0;
+ }
+ if (group_order != 0x01 && group_order != 0x02) {
+ ParseError("Invalid group order value in FETCH_OK");
+ return 0;
+ }
+ fetch_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
+ visitor_.OnFetchOkMessage(fetch_ok);
+ return reader.PreviouslyReadPayload().length();
+}
+
+size_t MoqtControlParser::ProcessFetchError(quic::QuicDataReader& reader) {
+ MoqtFetchError fetch_error;
+ uint64_t error_code;
+ if (!reader.ReadVarInt62(&fetch_error.subscribe_id) ||
+ !reader.ReadVarInt62(&error_code) ||
+ !reader.ReadStringVarInt62(fetch_error.reason_phrase)) {
+ return 0;
+ }
+ fetch_error.error_code = static_cast<SubscribeErrorCode>(error_code);
+ visitor_.OnFetchErrorMessage(fetch_error);
+ return reader.PreviouslyReadPayload().length();
+}
+
size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
MoqtObjectAck object_ack;
uint64_t raw_delta;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index 4ebaf85..58795c4 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -52,6 +52,10 @@
virtual void OnUnsubscribeAnnouncesMessage(
const MoqtUnsubscribeAnnounces& message) = 0;
virtual void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) = 0;
+ virtual void OnFetchMessage(const MoqtFetch& message) = 0;
+ virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
+ virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
+ virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
@@ -121,6 +125,10 @@
size_t ProcessSubscribeAnnouncesError(quic::QuicDataReader& reader);
size_t ProcessUnsubscribeAnnounces(quic::QuicDataReader& reader);
size_t ProcessMaxSubscribeId(quic::QuicDataReader& reader);
+ size_t ProcessFetch(quic::QuicDataReader& reader);
+ size_t ProcessFetchCancel(quic::QuicDataReader& reader);
+ size_t ProcessFetchOk(quic::QuicDataReader& reader);
+ size_t ProcessFetchError(quic::QuicDataReader& reader);
size_t ProcessObjectAck(quic::QuicDataReader& reader);
// If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 84644bc..fb9ed3c 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -52,6 +52,10 @@
MoqtMessageType::kSubscribeAnnouncesError,
MoqtMessageType::kUnsubscribeAnnounces,
MoqtMessageType::kMaxSubscribeId,
+ MoqtMessageType::kFetch,
+ MoqtMessageType::kFetchCancel,
+ MoqtMessageType::kFetchOk,
+ MoqtMessageType::kFetchError,
MoqtMessageType::kObjectAck,
};
constexpr std::array kDataStreamTypes{
@@ -194,6 +198,18 @@
void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override {
OnControlMessage(message);
}
+ void OnFetchMessage(const MoqtFetch& message) override {
+ OnControlMessage(message);
+ }
+ void OnFetchCancelMessage(const MoqtFetchCancel& message) override {
+ OnControlMessage(message);
+ }
+ void OnFetchOkMessage(const MoqtFetchOk& message) override {
+ OnControlMessage(message);
+ }
+ void OnFetchErrorMessage(const MoqtFetchError& message) override {
+ OnControlMessage(message);
+ }
void OnObjectAckMessage(const MoqtObjectAck& message) override {
OnControlMessage(message);
}
@@ -1310,6 +1326,39 @@
"SUBSCRIBE_DONE ContentExists has invalid value");
}
+TEST_F(MoqtMessageSpecificTest, FetchInvalidRange) {
+ MoqtControlParser parser(kRawQuic, visitor_);
+ FetchMessage fetch;
+ fetch.SetEndObject(1, 1);
+ parser.ProcessData(fetch.PacketSample(), false);
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_TRUE(visitor_.parsing_error_.has_value());
+ EXPECT_EQ(*visitor_.parsing_error_,
+ "End object comes before start object in FETCH");
+}
+
+TEST_F(MoqtMessageSpecificTest, FetchInvalidRange2) {
+ MoqtControlParser parser(kRawQuic, visitor_);
+ FetchMessage fetch;
+ fetch.SetEndObject(0, std::nullopt);
+ parser.ProcessData(fetch.PacketSample(), false);
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_TRUE(visitor_.parsing_error_.has_value());
+ EXPECT_EQ(*visitor_.parsing_error_,
+ "End object comes before start object in FETCH");
+}
+
+TEST_F(MoqtMessageSpecificTest, FetchInvalidGroupOrder) {
+ MoqtControlParser parser(kRawQuic, visitor_);
+ FetchMessage fetch;
+ fetch.SetGroupOrder(3);
+ parser.ProcessData(fetch.PacketSample(), false);
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_TRUE(visitor_.parsing_error_.has_value());
+ EXPECT_EQ(*visitor_.parsing_error_,
+ "Invalid group order value in FETCH message");
+}
+
TEST_F(MoqtMessageSpecificTest, PaddingStream) {
MoqtDataParser parser(&visitor_);
std::string buffer(32, '\0');
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index eeb9324..34e0aaf 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -213,6 +213,10 @@
void OnUnsubscribeAnnouncesMessage(
const MoqtUnsubscribeAnnounces& message) override {}
void OnMaxSubscribeIdMessage(const MoqtMaxSubscribeId& message) override;
+ void OnFetchMessage(const MoqtFetch& message) override {}
+ void OnFetchCancelMessage(const MoqtFetchCancel& message) override {}
+ void OnFetchOkMessage(const MoqtFetchOk& message) override {}
+ void OnFetchErrorMessage(const MoqtFetchError& message) override {}
void OnObjectAckMessage(const MoqtObjectAck& message) override {
auto subscription_it =
session_->published_subscriptions_.find(message.subscribe_id);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 23d34a9..d0318bd 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -39,7 +39,8 @@
MoqtAnnounceCancel, MoqtTrackStatusRequest, MoqtUnannounce,
MoqtTrackStatus, MoqtGoAway, MoqtSubscribeAnnounces,
MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError,
- MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtObjectAck>;
+ MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel,
+ MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
// The total actual size of the message.
size_t total_message_size() const { return wire_image_size_; }
@@ -1293,6 +1294,222 @@
};
};
+class QUICHE_NO_EXPORT FetchMessage : public TestMessageBase {
+ public:
+ FetchMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetch>(values);
+ if (cast.subscribe_id != fetch_.subscribe_id) {
+ QUIC_LOG(INFO) << "FETCH subscribe_id mismatch";
+ return false;
+ }
+ if (cast.full_track_name != fetch_.full_track_name) {
+ QUIC_LOG(INFO) << "FETCH full_track_name mismatch";
+ return false;
+ }
+ if (cast.subscriber_priority != fetch_.subscriber_priority) {
+ QUIC_LOG(INFO) << "FETCH subscriber_priority mismatch";
+ return false;
+ }
+ if (cast.group_order != fetch_.group_order) {
+ QUIC_LOG(INFO) << "FETCH group_order mismatch";
+ return false;
+ }
+ if (cast.start_object != fetch_.start_object) {
+ QUIC_LOG(INFO) << "FETCH start_object mismatch";
+ return false;
+ }
+ if (cast.end_group != fetch_.end_group) {
+ QUIC_LOG(INFO) << "FETCH end_group mismatch";
+ return false;
+ }
+ if (cast.end_object != fetch_.end_object) {
+ QUIC_LOG(INFO) << "FETCH end_object mismatch";
+ return false;
+ }
+ if (cast.parameters != fetch_.parameters) {
+ QUIC_LOG(INFO) << "FETCH parameters mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override {
+ ExpandVarintsImpl("vvvv---v-----vvvvvvv---");
+ }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_);
+ }
+
+ void SetEndObject(uint64_t group, std::optional<uint64_t> object) {
+ // Avoid varint nonsense.
+ QUICHE_CHECK(group < 64);
+ QUICHE_CHECK(!object.has_value() || *object < 64);
+ fetch_.end_group = group;
+ fetch_.end_object = object;
+ raw_packet_[16] = group;
+ raw_packet_[17] = object.has_value() ? (*object + 1) : 0;
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+
+ void SetGroupOrder(uint8_t group_order) {
+ raw_packet_[13] = static_cast<uint8_t>(group_order);
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+
+ private:
+ uint8_t raw_packet_[24] = {
+ 0x16, 0x16,
+ 0x01, // subscribe_id = 1
+ 0x01, 0x03, 0x66, 0x6f, 0x6f, // track_namespace = "foo"
+ 0x03, 0x62, 0x61, 0x72, // track_name = "bar"
+ 0x02, // priority = kHigh
+ 0x01, // group_order = kAscending
+ 0x01, 0x02, // start_object = 1, 2
+ 0x05, 0x07, // end_object = 5, 6
+ 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
+ };
+
+ MoqtFetch fetch_ = {
+ /*subscribe_id =*/1,
+ /*full_track_name=*/FullTrackName{"foo", "bar"},
+ /*subscriber_priority=*/2,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*start_object=*/FullSequence{1, 2},
+ /*end_group=*/5,
+ /*end_object=*/6,
+ /*parameters=*/
+ MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ };
+};
+
+class QUICHE_NO_EXPORT FetchCancelMessage : public TestMessageBase {
+ public:
+ FetchCancelMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetchCancel>(values);
+ if (cast.subscribe_id != fetch_cancel_.subscribe_id) {
+ QUIC_LOG(INFO) << "FETCH_CANCEL subscribe_id mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_cancel_);
+ }
+
+ private:
+ uint8_t raw_packet_[3] = {
+ 0x17, 0x01,
+ 0x01, // subscribe_id = 1
+ };
+
+ MoqtFetchCancel fetch_cancel_ = {
+ /*subscribe_id =*/1,
+ };
+};
+
+class QUICHE_NO_EXPORT FetchOkMessage : public TestMessageBase {
+ public:
+ FetchOkMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetchOk>(values);
+ if (cast.subscribe_id != fetch_ok_.subscribe_id) {
+ QUIC_LOG(INFO) << "FETCH_OK subscribe_id mismatch";
+ return false;
+ }
+ if (cast.group_order != fetch_ok_.group_order) {
+ QUIC_LOG(INFO) << "FETCH_OK group_order mismatch";
+ return false;
+ }
+ if (cast.largest_id != fetch_ok_.largest_id) {
+ QUIC_LOG(INFO) << "FETCH_OK start_object mismatch";
+ return false;
+ }
+ if (cast.parameters != fetch_ok_.parameters) {
+ QUIC_LOG(INFO) << "FETCH_OK parameters mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("vvv-vvvvv---"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_ok_);
+ }
+
+ private:
+ uint8_t raw_packet_[12] = {
+ 0x18, 0x0a,
+ 0x01, // subscribe_id = 1
+ 0x01, // group_order = kAscending
+ 0x05, 0x04, // largest_object = 5, 4
+ 0x01, 0x02, 0x03, 0x62, 0x61, 0x7a, // parameters = "baz"
+ };
+
+ MoqtFetchOk fetch_ok_ = {
+ /*subscribe_id =*/1,
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
+ /*start_object=*/FullSequence{5, 4},
+ /*parameters=*/
+ MoqtSubscribeParameters{"baz", std::nullopt, std::nullopt, std::nullopt},
+ };
+};
+
+class QUICHE_NO_EXPORT FetchErrorMessage : public TestMessageBase {
+ public:
+ FetchErrorMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtFetchError>(values);
+ if (cast.subscribe_id != fetch_error_.subscribe_id) {
+ QUIC_LOG(INFO) << "FETCH_ERROR subscribe_id mismatch";
+ return false;
+ }
+ if (cast.error_code != fetch_error_.error_code) {
+ QUIC_LOG(INFO) << "FETCH_ERROR group_order mismatch";
+ return false;
+ }
+ if (cast.reason_phrase != fetch_error_.reason_phrase) {
+ QUIC_LOG(INFO) << "FETCH_ERROR reason_phrase mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("vvvvv---"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(fetch_error_);
+ }
+
+ private:
+ uint8_t raw_packet_[8] = {
+ 0x19, 0x06,
+ 0x01, // subscribe_id = 1
+ 0x04, // error_code = kUnauthorized
+ 0x03, 0x62, 0x61, 0x72, // reason_phrase = "bar"
+ };
+
+ MoqtFetchError fetch_error_ = {
+ /*subscribe_id =*/1,
+ /*error_code=*/SubscribeErrorCode::kUnauthorized,
+ /*reason_phrase=*/"bar",
+ };
+};
+
class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase {
public:
ObjectAckMessage() : TestMessageBase() {
@@ -1383,6 +1600,14 @@
return std::make_unique<UnsubscribeAnnouncesMessage>();
case MoqtMessageType::kMaxSubscribeId:
return std::make_unique<MaxSubscribeIdMessage>();
+ case MoqtMessageType::kFetch:
+ return std::make_unique<FetchMessage>();
+ case MoqtMessageType::kFetchCancel:
+ return std::make_unique<FetchCancelMessage>();
+ case MoqtMessageType::kFetchOk:
+ return std::make_unique<FetchOkMessage>();
+ case MoqtMessageType::kFetchError:
+ return std::make_unique<FetchErrorMessage>();
case MoqtMessageType::kObjectAck:
return std::make_unique<ObjectAckMessage>();
case MoqtMessageType::kClientSetup: