Support serialization and parsing for MoQT draft-05 group order in SUBSCRIBE_OK.
PiperOrigin-RevId: 653859847
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index 78c9e3a..8439ebf 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -18,8 +18,10 @@
#include "quiche/quic/core/quic_data_writer.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/platform/api/quic_bug_tracker.h"
#include "quiche/common/platform/api/quiche_bug_tracker.h"
+#include "quiche/common/platform/api/quiche_logging.h"
#include "quiche/common/quiche_buffer_allocator.h"
#include "quiche/common/quiche_data_writer.h"
#include "quiche/common/simple_buffer_allocator.h"
@@ -124,6 +126,20 @@
return *std::move(buffer);
}
+WireUint8 WireDeliveryOrder(std::optional<MoqtDeliveryOrder> delivery_order) {
+ if (!delivery_order.has_value()) {
+ return WireUint8(0x00);
+ }
+ switch (*delivery_order) {
+ case MoqtDeliveryOrder::kAscending:
+ return WireUint8(0x01);
+ case MoqtDeliveryOrder::kDescending:
+ return WireUint8(0x02);
+ }
+ QUICHE_NOTREACHED();
+ return WireUint8(0xff);
+}
+
} // namespace
quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader(
@@ -323,13 +339,14 @@
return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk),
WireVarInt62(message.subscribe_id),
WireVarInt62(message.expires.ToMilliseconds()),
- WireUint8(1), WireVarInt62(message.largest_id->group),
+ WireDeliveryOrder(message.group_order), WireUint8(1),
+ WireVarInt62(message.largest_id->group),
WireVarInt62(message.largest_id->object));
}
return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk),
WireVarInt62(message.subscribe_id),
WireVarInt62(message.expires.ToMilliseconds()),
- WireUint8(0));
+ WireDeliveryOrder(message.group_order), WireUint8(0));
}
quiche::QuicheBuffer MoqtFramer::SerializeSubscribeError(
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc
index 80710e5..7899087 100644
--- a/quiche/quic/moqt/moqt_integration_test.cc
+++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -13,6 +13,7 @@
#include "quiche/quic/moqt/moqt_known_track_publisher.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_outgoing_queue.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_session.h"
#include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h"
#include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index 5521e36..584fcf9 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -267,6 +267,7 @@
uint64_t subscribe_id;
// The message uses ms, but expires is in us.
quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0);
+ MoqtDeliveryOrder group_order;
// If ContextExists on the wire is zero, largest_id has no value.
std::optional<FullSequence> largest_id;
};
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h
index 42d6425..e3a7d3a 100644
--- a/quiche/quic/moqt/moqt_outgoing_queue.h
+++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -67,6 +67,9 @@
MoqtPriority GetPublisherPriority() const override {
return publisher_priority_;
}
+ MoqtDeliveryOrder GetDeliveryOrder() const override {
+ return delivery_order_;
+ }
bool HasSubscribers() const { return !listeners_.empty(); }
@@ -87,6 +90,7 @@
FullTrackName track_;
MoqtForwardingPreference forwarding_preference_;
MoqtPriority publisher_priority_ = 128;
+ MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending;
absl::InlinedVector<Group, kMaxQueuedGroups> queue_;
uint64_t current_group_id_ = -1;
absl::flat_hash_set<MoqtObjectListener*> listeners_;
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index a5b180d..c5a51d7 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -16,6 +16,7 @@
#include "quiche/quic/core/quic_data_reader.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/common/platform/api/quiche_logging.h"
namespace moqt {
@@ -481,9 +482,10 @@
size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) {
MoqtSubscribeOk subscribe_ok;
uint64_t milliseconds;
+ uint8_t group_order;
uint8_t content_exists;
if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) ||
- !reader.ReadVarInt62(&milliseconds) ||
+ !reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) ||
!reader.ReadUInt8(&content_exists)) {
return 0;
}
@@ -491,7 +493,12 @@
ParseError("SUBSCRIBE_OK ContentExists has invalid value");
return 0;
}
+ if (group_order != 0x01 && group_order != 0x02) {
+ ParseError("Invalid group order value in SUBSCRIBE_OK");
+ return 0;
+ }
subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds);
+ subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order);
if (content_exists) {
subscribe_ok.largest_id = FullSequence();
if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index 80dfade..3cf061a 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -1075,6 +1075,17 @@
"SUBSCRIBE_OK ContentExists has invalid value");
}
+TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidDeliveryOrder) {
+ MoqtParser parser(kRawQuic, visitor_);
+ SubscribeOkMessage subscribe_ok;
+ subscribe_ok.SetInvalidDeliveryOrder();
+ parser.ProcessData(subscribe_ok.PacketSample(), false);
+ EXPECT_EQ(visitor_.messages_received_, 0);
+ EXPECT_TRUE(visitor_.parsing_error_.has_value());
+ EXPECT_EQ(*visitor_.parsing_error_,
+ "Invalid group order value in SUBSCRIBE_OK");
+}
+
TEST_F(MoqtMessageSpecificTest, SubscribeDoneInvalidContentExists) {
MoqtParser parser(kRawQuic, visitor_);
SubscribeDoneMessage subscribe_done;
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h
index f8c8ad9..85449d9 100644
--- a/quiche/quic/moqt/moqt_priority.h
+++ b/quiche/quic/moqt/moqt_priority.h
@@ -19,8 +19,8 @@
// Indicates the desired order of delivering groups associated with a given
// track.
enum class MoqtDeliveryOrder : uint8_t {
- kAscending,
- kDescending,
+ kAscending = 0x01,
+ kDescending = 0x02,
};
// Computes WebTransport send order for an MoQT data stream with the specified
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h
index 4c66001..778236c 100644
--- a/quiche/quic/moqt/moqt_publisher.h
+++ b/quiche/quic/moqt/moqt_publisher.h
@@ -94,6 +94,9 @@
// Returns the current forwarding priority of the track.
virtual MoqtPriority GetPublisherPriority() const = 0;
+
+ // Returns the publisher-preferred delivery order for the track.
+ virtual MoqtDeliveryOrder GetDeliveryOrder() const = 0;
};
// MoqtPublisher is an interface to a publisher that allows it to publish
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index db716fb..58ff443 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -27,6 +27,7 @@
#include "quiche/quic/moqt/moqt_framer.h"
#include "quiche/quic/moqt/moqt_messages.h"
#include "quiche/quic/moqt/moqt_parser.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/moqt/moqt_publisher.h"
#include "quiche/quic/moqt/moqt_subscribe_windows.h"
#include "quiche/quic/moqt/moqt_track.h"
@@ -610,6 +611,7 @@
if (PublisherHasData(**track_publisher)) {
largest_id = (*track_publisher)->GetLargestSequence();
}
+ MoqtDeliveryOrder delivery_order = (*track_publisher)->GetDeliveryOrder();
auto subscription = std::make_unique<MoqtSession::PublishedSubscription>(
session_, *std::move(track_publisher), message);
@@ -622,6 +624,7 @@
MoqtSubscribeOk subscribe_ok;
subscribe_ok.subscribe_id = message.subscribe_id;
+ subscribe_ok.group_order = delivery_order;
subscribe_ok.largest_id = largest_id;
SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 1b5b2e4..9a1bf9c 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -679,6 +679,7 @@
MoqtSubscribeOk ok = {
/*subscribe_id=*/1,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
};
StrictMock<webtransport::test::MockStream> mock_control_stream;
@@ -851,6 +852,7 @@
MoqtSubscribeOk ok = {
/*subscribe_id=*/1,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(0),
+ /*group_order=*/MoqtDeliveryOrder::kAscending,
/*largest_id=*/std::nullopt,
};
StrictMock<webtransport::test::MockStream> mock_control_stream;
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index c4f4976..44a6405 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -18,6 +18,7 @@
#include "quiche/quic/core/quic_data_writer.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/moqt/moqt_messages.h"
+#include "quiche/quic/moqt/moqt_priority.h"
#include "quiche/quic/platform/api/quic_logging.h"
#include "quiche/common/platform/api/quiche_export.h"
#include "quiche/common/quiche_endian.h"
@@ -496,6 +497,10 @@
QUIC_LOG(INFO) << "SUBSCRIBE OK expiration mismatch";
return false;
}
+ if (cast.group_order != subscribe_ok_.group_order) {
+ QUIC_LOG(INFO) << "SUBSCRIBE OK group order mismatch";
+ return false;
+ }
if (cast.largest_id != subscribe_ok_.largest_id) {
QUIC_LOG(INFO) << "SUBSCRIBE OK largest ID mismatch";
return false;
@@ -503,26 +508,33 @@
return true;
}
- void ExpandVarints() override { ExpandVarintsImpl("vvv-vv"); }
+ void ExpandVarints() override { ExpandVarintsImpl("vvv--vv"); }
MessageStructuredData structured_data() const override {
return TestMessageBase::MessageStructuredData(subscribe_ok_);
}
void SetInvalidContentExists() {
- raw_packet_[3] = 0x02;
+ raw_packet_[4] = 0x02;
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+
+ void SetInvalidDeliveryOrder() {
+ raw_packet_[3] = 0x10;
SetWireImage(raw_packet_, sizeof(raw_packet_));
}
private:
- uint8_t raw_packet_[6] = {
+ uint8_t raw_packet_[7] = {
0x04, 0x01, 0x03, // subscribe_id = 1, expires = 3
+ 0x02, // delivery_order = 2,
0x01, 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20,
};
MoqtSubscribeOk subscribe_ok_ = {
/*subscribe_id=*/1,
/*expires=*/quic::QuicTimeDelta::FromMilliseconds(3),
+ /*group_order=*/MoqtDeliveryOrder::kDescending,
/*largest_id=*/FullSequence(12, 20),
};
};
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h
index 5ab728a..a5475a3 100644
--- a/quiche/quic/moqt/tools/moqt_mock_visitor.h
+++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -45,7 +45,10 @@
class MockTrackPublisher : public MoqtTrackPublisher {
public:
explicit MockTrackPublisher(FullTrackName name)
- : track_name_(std::move(name)) {}
+ : track_name_(std::move(name)) {
+ ON_CALL(*this, GetDeliveryOrder())
+ .WillByDefault(testing::Return(MoqtDeliveryOrder::kAscending));
+ }
const FullTrackName& GetTrackName() const override { return track_name_; }
MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject,
@@ -62,6 +65,7 @@
MOCK_METHOD(MoqtForwardingPreference, GetForwardingPreference, (),
(const, override));
MOCK_METHOD(MoqtPriority, GetPublisherPriority, (), (const, override));
+ MOCK_METHOD(MoqtDeliveryOrder, GetDeliveryOrder, (), (const, override));
private:
FullTrackName track_name_;