Support serialization and parsing for MoQT draft-05 group order in SUBSCRIBE_OK. PiperOrigin-RevId: 653859847
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 78c9e3a..8439ebf 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -18,8 +18,10 @@ #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/platform/api/quic_bug_tracker.h" #include "quiche/common/platform/api/quiche_bug_tracker.h" +#include "quiche/common/platform/api/quiche_logging.h" #include "quiche/common/quiche_buffer_allocator.h" #include "quiche/common/quiche_data_writer.h" #include "quiche/common/simple_buffer_allocator.h" @@ -124,6 +126,20 @@ return *std::move(buffer); } +WireUint8 WireDeliveryOrder(std::optional<MoqtDeliveryOrder> delivery_order) { + if (!delivery_order.has_value()) { + return WireUint8(0x00); + } + switch (*delivery_order) { + case MoqtDeliveryOrder::kAscending: + return WireUint8(0x01); + case MoqtDeliveryOrder::kDescending: + return WireUint8(0x02); + } + QUICHE_NOTREACHED(); + return WireUint8(0xff); +} + } // namespace quiche::QuicheBuffer MoqtFramer::SerializeObjectHeader( @@ -323,13 +339,14 @@ return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk), WireVarInt62(message.subscribe_id), WireVarInt62(message.expires.ToMilliseconds()), - WireUint8(1), WireVarInt62(message.largest_id->group), + WireDeliveryOrder(message.group_order), WireUint8(1), + WireVarInt62(message.largest_id->group), WireVarInt62(message.largest_id->object)); } return Serialize(WireVarInt62(MoqtMessageType::kSubscribeOk), WireVarInt62(message.subscribe_id), WireVarInt62(message.expires.ToMilliseconds()), - WireUint8(0)); + WireDeliveryOrder(message.group_order), WireUint8(0)); } quiche::QuicheBuffer MoqtFramer::SerializeSubscribeError(
diff --git a/quiche/quic/moqt/moqt_integration_test.cc b/quiche/quic/moqt/moqt_integration_test.cc index 80710e5..7899087 100644 --- a/quiche/quic/moqt/moqt_integration_test.cc +++ b/quiche/quic/moqt/moqt_integration_test.cc
@@ -13,6 +13,7 @@ #include "quiche/quic/moqt/moqt_known_track_publisher.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_outgoing_queue.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_session.h" #include "quiche/quic/moqt/test_tools/moqt_simulator_harness.h" #include "quiche/quic/moqt/tools/moqt_mock_visitor.h"
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index 5521e36..584fcf9 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -267,6 +267,7 @@ uint64_t subscribe_id; // The message uses ms, but expires is in us. quic::QuicTimeDelta expires = quic::QuicTimeDelta::FromMilliseconds(0); + MoqtDeliveryOrder group_order; // If ContextExists on the wire is zero, largest_id has no value. std::optional<FullSequence> largest_id; };
diff --git a/quiche/quic/moqt/moqt_outgoing_queue.h b/quiche/quic/moqt/moqt_outgoing_queue.h index 42d6425..e3a7d3a 100644 --- a/quiche/quic/moqt/moqt_outgoing_queue.h +++ b/quiche/quic/moqt/moqt_outgoing_queue.h
@@ -67,6 +67,9 @@ MoqtPriority GetPublisherPriority() const override { return publisher_priority_; } + MoqtDeliveryOrder GetDeliveryOrder() const override { + return delivery_order_; + } bool HasSubscribers() const { return !listeners_.empty(); } @@ -87,6 +90,7 @@ FullTrackName track_; MoqtForwardingPreference forwarding_preference_; MoqtPriority publisher_priority_ = 128; + MoqtDeliveryOrder delivery_order_ = MoqtDeliveryOrder::kAscending; absl::InlinedVector<Group, kMaxQueuedGroups> queue_; uint64_t current_group_id_ = -1; absl::flat_hash_set<MoqtObjectListener*> listeners_;
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index a5b180d..c5a51d7 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -16,6 +16,7 @@ #include "quiche/quic/core/quic_data_reader.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/common/platform/api/quiche_logging.h" namespace moqt { @@ -481,9 +482,10 @@ size_t MoqtParser::ProcessSubscribeOk(quic::QuicDataReader& reader) { MoqtSubscribeOk subscribe_ok; uint64_t milliseconds; + uint8_t group_order; uint8_t content_exists; if (!reader.ReadVarInt62(&subscribe_ok.subscribe_id) || - !reader.ReadVarInt62(&milliseconds) || + !reader.ReadVarInt62(&milliseconds) || !reader.ReadUInt8(&group_order) || !reader.ReadUInt8(&content_exists)) { return 0; } @@ -491,7 +493,12 @@ ParseError("SUBSCRIBE_OK ContentExists has invalid value"); return 0; } + if (group_order != 0x01 && group_order != 0x02) { + ParseError("Invalid group order value in SUBSCRIBE_OK"); + return 0; + } subscribe_ok.expires = quic::QuicTimeDelta::FromMilliseconds(milliseconds); + subscribe_ok.group_order = static_cast<MoqtDeliveryOrder>(group_order); if (content_exists) { subscribe_ok.largest_id = FullSequence(); if (!reader.ReadVarInt62(&subscribe_ok.largest_id->group) ||
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 80dfade..3cf061a 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -1075,6 +1075,17 @@ "SUBSCRIBE_OK ContentExists has invalid value"); } +TEST_F(MoqtMessageSpecificTest, SubscribeOkInvalidDeliveryOrder) { + MoqtParser parser(kRawQuic, visitor_); + SubscribeOkMessage subscribe_ok; + subscribe_ok.SetInvalidDeliveryOrder(); + parser.ProcessData(subscribe_ok.PacketSample(), false); + EXPECT_EQ(visitor_.messages_received_, 0); + EXPECT_TRUE(visitor_.parsing_error_.has_value()); + EXPECT_EQ(*visitor_.parsing_error_, + "Invalid group order value in SUBSCRIBE_OK"); +} + TEST_F(MoqtMessageSpecificTest, SubscribeDoneInvalidContentExists) { MoqtParser parser(kRawQuic, visitor_); SubscribeDoneMessage subscribe_done;
diff --git a/quiche/quic/moqt/moqt_priority.h b/quiche/quic/moqt/moqt_priority.h index f8c8ad9..85449d9 100644 --- a/quiche/quic/moqt/moqt_priority.h +++ b/quiche/quic/moqt/moqt_priority.h
@@ -19,8 +19,8 @@ // Indicates the desired order of delivering groups associated with a given // track. enum class MoqtDeliveryOrder : uint8_t { - kAscending, - kDescending, + kAscending = 0x01, + kDescending = 0x02, }; // Computes WebTransport send order for an MoQT data stream with the specified
diff --git a/quiche/quic/moqt/moqt_publisher.h b/quiche/quic/moqt/moqt_publisher.h index 4c66001..778236c 100644 --- a/quiche/quic/moqt/moqt_publisher.h +++ b/quiche/quic/moqt/moqt_publisher.h
@@ -94,6 +94,9 @@ // Returns the current forwarding priority of the track. virtual MoqtPriority GetPublisherPriority() const = 0; + + // Returns the publisher-preferred delivery order for the track. + virtual MoqtDeliveryOrder GetDeliveryOrder() const = 0; }; // MoqtPublisher is an interface to a publisher that allows it to publish
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index db716fb..58ff443 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -27,6 +27,7 @@ #include "quiche/quic/moqt/moqt_framer.h" #include "quiche/quic/moqt/moqt_messages.h" #include "quiche/quic/moqt/moqt_parser.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/moqt/moqt_publisher.h" #include "quiche/quic/moqt/moqt_subscribe_windows.h" #include "quiche/quic/moqt/moqt_track.h" @@ -610,6 +611,7 @@ if (PublisherHasData(**track_publisher)) { largest_id = (*track_publisher)->GetLargestSequence(); } + MoqtDeliveryOrder delivery_order = (*track_publisher)->GetDeliveryOrder(); auto subscription = std::make_unique<MoqtSession::PublishedSubscription>( session_, *std::move(track_publisher), message); @@ -622,6 +624,7 @@ MoqtSubscribeOk subscribe_ok; subscribe_ok.subscribe_id = message.subscribe_id; + subscribe_ok.group_order = delivery_order; subscribe_ok.largest_id = largest_id; SendOrBufferMessage(session_->framer_.SerializeSubscribeOk(subscribe_ok));
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 1b5b2e4..9a1bf9c 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -679,6 +679,7 @@ MoqtSubscribeOk ok = { /*subscribe_id=*/1, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), + /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, }; StrictMock<webtransport::test::MockStream> mock_control_stream; @@ -851,6 +852,7 @@ MoqtSubscribeOk ok = { /*subscribe_id=*/1, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), + /*group_order=*/MoqtDeliveryOrder::kAscending, /*largest_id=*/std::nullopt, }; StrictMock<webtransport::test::MockStream> mock_control_stream;
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index c4f4976..44a6405 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -18,6 +18,7 @@ #include "quiche/quic/core/quic_data_writer.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/moqt/moqt_messages.h" +#include "quiche/quic/moqt/moqt_priority.h" #include "quiche/quic/platform/api/quic_logging.h" #include "quiche/common/platform/api/quiche_export.h" #include "quiche/common/quiche_endian.h" @@ -496,6 +497,10 @@ QUIC_LOG(INFO) << "SUBSCRIBE OK expiration mismatch"; return false; } + if (cast.group_order != subscribe_ok_.group_order) { + QUIC_LOG(INFO) << "SUBSCRIBE OK group order mismatch"; + return false; + } if (cast.largest_id != subscribe_ok_.largest_id) { QUIC_LOG(INFO) << "SUBSCRIBE OK largest ID mismatch"; return false; @@ -503,26 +508,33 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvv-vv"); } + void ExpandVarints() override { ExpandVarintsImpl("vvv--vv"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(subscribe_ok_); } void SetInvalidContentExists() { - raw_packet_[3] = 0x02; + raw_packet_[4] = 0x02; + SetWireImage(raw_packet_, sizeof(raw_packet_)); + } + + void SetInvalidDeliveryOrder() { + raw_packet_[3] = 0x10; SetWireImage(raw_packet_, sizeof(raw_packet_)); } private: - uint8_t raw_packet_[6] = { + uint8_t raw_packet_[7] = { 0x04, 0x01, 0x03, // subscribe_id = 1, expires = 3 + 0x02, // delivery_order = 2, 0x01, 0x0c, 0x14, // largest_group_id = 12, largest_object_id = 20, }; MoqtSubscribeOk subscribe_ok_ = { /*subscribe_id=*/1, /*expires=*/quic::QuicTimeDelta::FromMilliseconds(3), + /*group_order=*/MoqtDeliveryOrder::kDescending, /*largest_id=*/FullSequence(12, 20), }; };
diff --git a/quiche/quic/moqt/tools/moqt_mock_visitor.h b/quiche/quic/moqt/tools/moqt_mock_visitor.h index 5ab728a..a5475a3 100644 --- a/quiche/quic/moqt/tools/moqt_mock_visitor.h +++ b/quiche/quic/moqt/tools/moqt_mock_visitor.h
@@ -45,7 +45,10 @@ class MockTrackPublisher : public MoqtTrackPublisher { public: explicit MockTrackPublisher(FullTrackName name) - : track_name_(std::move(name)) {} + : track_name_(std::move(name)) { + ON_CALL(*this, GetDeliveryOrder()) + .WillByDefault(testing::Return(MoqtDeliveryOrder::kAscending)); + } const FullTrackName& GetTrackName() const override { return track_name_; } MOCK_METHOD(std::optional<PublishedObject>, GetCachedObject, @@ -62,6 +65,7 @@ MOCK_METHOD(MoqtForwardingPreference, GetForwardingPreference, (), (const, override)); MOCK_METHOD(MoqtPriority, GetPublisherPriority, (), (const, override)); + MOCK_METHOD(MoqtDeliveryOrder, GetDeliveryOrder, (), (const, override)); private: FullTrackName track_name_;