Update MoQT SUBSCRIBE_UPDATE to draft-11. Add a session API to update upstream SUBSCRIBE. PiperOrigin-RevId: 758303508
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 43caa3a..139a6d7 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -534,10 +534,10 @@ uint64_t end_group = message.end_group.has_value() ? *message.end_group + 1 : 0; return SerializeControlMessage( - MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.subscribe_id), + MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.request_id), WireVarInt62(message.start.group), WireVarInt62(message.start.object), WireVarInt62(end_group), WireUint8(message.subscriber_priority), - WireKeyValuePairList(parameters)); + WireBoolean(message.forward), WireKeyValuePairList(parameters)); } quiche::QuicheBuffer MoqtFramer::SerializeAnnounce(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 16939e3..ab64df2 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -472,6 +472,7 @@ /*start=*/Location(4, 3), /*end_group=*/4, /*subscriber_priority=*/0xaa, + /*forward=*/true, VersionSpecificParameters(), }; quiche::QuicheBuffer buffer; @@ -487,6 +488,7 @@ /*start=*/Location(4, 3), /*end_group=*/4, /*subscriber_priority=*/0xaa, + /*forward=*/true, VersionSpecificParameters(), }; quiche::QuicheBuffer buffer;
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index ef7d3b0..1b61d18 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -577,10 +577,11 @@ }; struct QUICHE_EXPORT MoqtSubscribeUpdate { - uint64_t subscribe_id; + uint64_t request_id; Location start; std::optional<uint64_t> end_group; MoqtPriority subscriber_priority; + bool forward; VersionSpecificParameters parameters; };
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 8761ffb..608052b 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -551,10 +551,12 @@ size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) { MoqtSubscribeUpdate subscribe_update; uint64_t start_group, start_object, end_group; - if (!reader.ReadVarInt62(&subscribe_update.subscribe_id) || + uint8_t forward; + if (!reader.ReadVarInt62(&subscribe_update.request_id) || !reader.ReadVarInt62(&start_group) || !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) || - !reader.ReadUInt8(&subscribe_update.subscriber_priority)) { + !reader.ReadUInt8(&subscribe_update.subscriber_priority) || + !reader.ReadUInt8(&forward)) { return 0; } KeyValuePairList parameters; @@ -578,6 +580,11 @@ return 0; } } + if (forward > 1) { + ParseError("Invalid forward value in SUBSCRIBE_UPDATE"); + return 0; + } + subscribe_update.forward = (forward == 1); visitor_.OnSubscribeUpdateMessage(subscribe_update); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index 4779c89..d979ffe 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -912,8 +912,8 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kWebTrans, &stream, visitor_); char subscribe_update[] = { - 0x02, 0x00, 0x0d, 0x02, 0x03, 0x01, 0x05, // start and end sequences - 0xaa, // priority = 0xaa + 0x02, 0x00, 0x0e, 0x02, 0x03, 0x01, 0x05, // start and end sequences + 0xaa, 0x01, // priority, forward 0x01, // 1 parameter 0x01, 0x05, 0x03, 0x00, 0x62, 0x61, 0x72, // authorization_token = "bar" }; @@ -1184,8 +1184,8 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe_update[] = { - 0x02, 0x00, 0x06, 0x02, 0x03, 0x01, 0x04, // start and end sequences - 0x20, // priority + 0x02, 0x00, 0x07, 0x02, 0x03, 0x01, 0x04, // start and end sequences + 0x20, 0x01, // priority, forward 0x00, // No parameters }; stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), @@ -1198,8 +1198,8 @@ webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); char subscribe_update[] = { - 0x02, 0x00, 0x08, 0x02, 0x03, 0x01, 0x03, // start and end sequences - 0x20, // priority + 0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x03, // start and end sequences + 0x20, 0x01, // priority, forward 0x01, // 1 parameter 0x02, 0x20, // delivery_timeout = 32 ms };
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 0c8e75e..3b6be52 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -395,6 +395,45 @@ return Subscribe(message, visitor); } +bool MoqtSession::SubscribeUpdate( + const FullTrackName& name, std::optional<Location> start, + std::optional<uint64_t> end_group, + std::optional<MoqtPriority> subscriber_priority, + std::optional<bool> forward, VersionSpecificParameters parameters) { + auto it = subscribe_by_name_.find(name); + if (it == subscribe_by_name_.end()) { + return false; + } + SubscribeRemoteTrack* track = it->second; + MoqtSubscribeUpdate subscribe_update; + subscribe_update.request_id = track->request_id(); + subscribe_update.start = start.value_or(track->window().start()); + subscribe_update.end_group = end_group.value_or(track->window().end().group); + if (subscribe_update.end_group == UINT64_MAX) { + subscribe_update.end_group = std::nullopt; + } + subscribe_update.subscriber_priority = + subscriber_priority.value_or(track->subscriber_priority()); + subscribe_update.forward = forward.value_or(track->forward()); + subscribe_update.parameters = parameters; + if (subscribe_update.start < track->window().start() || + (subscribe_update.end_group.has_value() && + (*subscribe_update.end_group > track->window().end().group || + *subscribe_update.end_group < subscribe_update.start.group))) { + // Invalid range. + return false; + } + // Input is valid. Update subscription properties. + track->TruncateStart(subscribe_update.start); + if (subscribe_update.end_group.has_value()) { + track->TruncateEnd(*subscribe_update.end_group); + } + track->set_subscriber_priority(subscribe_update.subscriber_priority); + track->set_forward(subscribe_update.forward); + SendControlMessage(framer_.SerializeSubscribeUpdate(subscribe_update)); + return true; +}; + void MoqtSession::Unsubscribe(const FullTrackName& name) { SubscribeRemoteTrack* track = RemoteTrackByName(name); if (track == nullptr) { @@ -402,7 +441,7 @@ } QUIC_DLOG(INFO) << ENDPOINT << "Sent UNSUBSCRIBE message for " << name; MoqtUnsubscribe message; - message.subscribe_id = track->subscribe_id(); + message.subscribe_id = track->request_id(); SendControlMessage(framer_.SerializeUnsubscribe(message)); DestroySubscription(track); } @@ -1063,7 +1102,7 @@ message.reason_phrase); } session_->subscribe_by_alias_.erase(subscribe->track_alias()); - session_->upstream_by_id_.erase(subscribe->subscribe_id()); + session_->upstream_by_id_.erase(subscribe->request_id()); } void MoqtSession::ControlStream::OnUnsubscribeMessage( @@ -1095,7 +1134,7 @@ void MoqtSession::ControlStream::OnSubscribeUpdateMessage( const MoqtSubscribeUpdate& message) { - auto it = session_->published_subscriptions_.find(message.subscribe_id); + auto it = session_->published_subscriptions_.find(message.request_id); if (it == session_->published_subscriptions_.end()) { return; }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 77bc1fb..ec0efe1 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -115,9 +115,8 @@ // Returns true if SUBSCRIBE was sent. If there is already a subscription to // the track, the message will still be sent. However, the visitor will be - // ignored. + // ignored. If |visitor| is nullptr, forward will be set to false. // Subscribe from (start_group, start_object) to the end of the track. - // TODO(martinduke): Allow setting forward = false. bool SubscribeAbsolute(const FullTrackName& name, uint64_t start_group, uint64_t start_object, SubscribeRemoteTrack::Visitor* visitor, @@ -133,6 +132,11 @@ bool SubscribeNextGroup(const FullTrackName& name, SubscribeRemoteTrack::Visitor* visitor, VersionSpecificParameters parameters) override; + bool SubscribeUpdate(const FullTrackName& name, std::optional<Location> start, + std::optional<uint64_t> end_group, + std::optional<MoqtPriority> subscriber_priority, + std::optional<bool> forward, + VersionSpecificParameters parameters) override; // Returns false if the subscription is not found. The session immediately // destroys all subscription state. void Unsubscribe(const FullTrackName& name);
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index f26266d..8f76a19 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -66,6 +66,14 @@ SubscribeRemoteTrack::Visitor* visitor, VersionSpecificParameters parameters) = 0; + // If an argument is nullopt, there is no change to the current value. + virtual bool SubscribeUpdate(const FullTrackName& name, + std::optional<Location> start, + std::optional<uint64_t> end_group, + std::optional<MoqtPriority> subscriber_priority, + std::optional<bool> forward, + VersionSpecificParameters parameters) = 0; + // Sends an UNSUBSCRIBE message and removes all of the state related to the // subscription. Returns false if the subscription is not found. virtual void Unsubscribe(const FullTrackName& name) = 0;
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index bbc41ad..f92dead 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -733,6 +733,68 @@ stream_input->OnSubscribeOkMessage(ok); } +TEST_F(MoqtSessionTest, OutgoingSubscribeUpdate) { + std::unique_ptr<MoqtControlParserVisitor> stream_input = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MockSubscribeRemoteTrackVisitor remote_track_visitor; + EXPECT_CALL(mock_session_, GetStreamById) + .WillRepeatedly(Return(&mock_stream_)); + EXPECT_CALL(mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); + session_.SubscribeAbsolute(FullTrackName("foo", "bar"), 1, 0, 10, + &remote_track_visitor, + VersionSpecificParameters()); + MoqtSubscribeOk ok = { + /*request_id=*/0, + /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), + }; + EXPECT_CALL(remote_track_visitor, OnReply); + stream_input->OnSubscribeOkMessage(ok); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _)); + EXPECT_TRUE(session_.SubscribeUpdate( + FullTrackName("foo", "bar"), Location(2, 1), 9, std::nullopt, + std::nullopt, VersionSpecificParameters())); + SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 0); + EXPECT_FALSE(track->InWindow(Location(2, 0))); + EXPECT_TRUE(track->InWindow(Location(2, 1))); + EXPECT_TRUE(track->InWindow(Location(9, UINT64_MAX))); + EXPECT_FALSE(track->InWindow(Location(10, 0))); +} + +TEST_F(MoqtSessionTest, OutgoingSubscribeUpdateInvalid) { + std::unique_ptr<MoqtControlParserVisitor> stream_input = + MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); + MockSubscribeRemoteTrackVisitor remote_track_visitor; + EXPECT_CALL(mock_session_, GetStreamById) + .WillRepeatedly(Return(&mock_stream_)); + EXPECT_CALL(mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); + session_.SubscribeAbsolute(FullTrackName("foo", "bar"), 1, 0, 10, + &remote_track_visitor, + VersionSpecificParameters()); + MoqtSubscribeOk ok = { + /*request_id=*/0, + /*expires=*/quic::QuicTimeDelta::FromMilliseconds(0), + }; + EXPECT_CALL(remote_track_visitor, OnReply); + stream_input->OnSubscribeOkMessage(ok); + EXPECT_CALL( + mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _)) + .Times(0); + EXPECT_FALSE(session_.SubscribeUpdate( + FullTrackName("foo", "bar"), Location(0, 0), 10, std::nullopt, + std::nullopt, VersionSpecificParameters())); + EXPECT_FALSE(session_.SubscribeUpdate( + FullTrackName("foo", "bar"), Location(1, 0), 11, std::nullopt, + std::nullopt, VersionSpecificParameters())); + EXPECT_FALSE(session_.SubscribeUpdate( + FullTrackName("foo", "bar"), Location(7, 0), 6, std::nullopt, + std::nullopt, VersionSpecificParameters())); +} + TEST_F(MoqtSessionTest, MaxRequestIdChangesResponse) { MoqtSessionPeer::set_next_request_id(&session_, kDefaultInitialMaxRequestId); MockSubscribeRemoteTrackVisitor remote_track_visitor;
diff --git a/quiche/quic/moqt/moqt_track.cc b/quiche/quic/moqt/moqt_track.cc index 9809805..ed2c002 100644 --- a/quiche/quic/moqt/moqt_track.cc +++ b/quiche/quic/moqt/moqt_track.cc
@@ -120,12 +120,13 @@ } } -void UpstreamFetch::OnFetchResult(Location largest_id, absl::Status status, +void UpstreamFetch::OnFetchResult(Location largest_location, + absl::Status status, TaskDestroyedCallback callback) { - auto task = std::make_unique<UpstreamFetchTask>(largest_id, status, + auto task = std::make_unique<UpstreamFetchTask>(largest_location, status, std::move(callback)); task_ = task->weak_ptr(); - window().TruncateEnd(largest_id); + window_mutable().TruncateEnd(largest_location); std::move(ok_callback_)(std::move(task)); if (can_read_callback_) { task_.GetIfAvailable()->set_can_read_callback( @@ -169,7 +170,7 @@ output.status = next_object_->object_status; output.publisher_priority = next_object_->publisher_priority; output.fin_after_this = false; - if (output.sequence == largest_id_) { // This is the last object. + if (output.sequence == largest_location_) { // This is the last object. eof_ = true; } next_object_.reset();
diff --git a/quiche/quic/moqt/moqt_track.h b/quiche/quic/moqt/moqt_track.h index 0783141..d9abd82 100644 --- a/quiche/quic/moqt/moqt_track.h +++ b/quiche/quic/moqt/moqt_track.h
@@ -38,9 +38,10 @@ class RemoteTrack { public: RemoteTrack(const FullTrackName& full_track_name, uint64_t id, - SubscribeWindow window) + SubscribeWindow window, MoqtPriority priority) : full_track_name_(full_track_name), - subscribe_id_(id), + request_id_(id), + subscriber_priority_(priority), window_(window), weak_ptr_factory_(this) {} virtual ~RemoteTrack() = default; @@ -62,7 +63,7 @@ *data_stream_type_ == MoqtDataStreamType::kStreamHeaderFetch; } - uint64_t subscribe_id() const { return subscribe_id_; } + uint64_t request_id() const { return request_id_; } // Is the object one that was requested? bool InWindow(Location sequence) const { return window_.InWindow(sequence); } @@ -71,12 +72,20 @@ return weak_ptr_factory_.Create(); } + const SubscribeWindow& window() const { return window_; } + + MoqtPriority subscriber_priority() const { return subscriber_priority_; } + void set_subscriber_priority(MoqtPriority priority) { + subscriber_priority_ = priority; + } + protected: - SubscribeWindow& window() { return window_; }; + SubscribeWindow& window_mutable() { return window_; }; private: const FullTrackName full_track_name_; - const uint64_t subscribe_id_; + const uint64_t request_id_; + MoqtPriority subscriber_priority_; SubscribeWindow window_; std::optional<MoqtDataStreamType> data_stream_type_; // If false, an object or OK message has been received, so any ERROR message @@ -100,7 +109,7 @@ // automatically retry. virtual void OnReply( const FullTrackName& full_track_name, - std::optional<Location> largest_id, + std::optional<Location> largest_location, std::optional<absl::string_view> error_reason_phrase) = 0; // Called when the subscription process is far enough that it is possible to // send OBJECT_ACK messages; provides a callback to do so. The callback is @@ -118,8 +127,10 @@ SubscribeRemoteTrack(const MoqtSubscribe& subscribe, Visitor* visitor) : RemoteTrack(subscribe.full_track_name, subscribe.request_id, SubscribeWindow(subscribe.start.value_or(Location()), - subscribe.end_group)), + subscribe.end_group), + subscribe.subscriber_priority), track_alias_(subscribe.track_alias), + forward_(subscribe.forward), visitor_(visitor), delivery_timeout_(subscribe.parameters.delivery_timeout), subscribe_(std::make_unique<MoqtSubscribe>(subscribe)) {} @@ -151,10 +162,12 @@ } } // Called on SUBSCRIBE_OK or SUBSCRIBE_UPDATE. - bool TruncateStart(Location start) { return window().TruncateStart(start); } + bool TruncateStart(Location start) { + return window_mutable().TruncateStart(start); + } // Called on SUBSCRIBE_UPDATE. bool TruncateEnd(uint64_t end_group) { - return window().TruncateEnd(end_group); + return window_mutable().TruncateEnd(end_group); } void OnStreamOpened(); void OnStreamClosed(); @@ -170,6 +183,9 @@ // FETCH objects to pipe directly into the visitor. void OnJoiningFetchReady(std::unique_ptr<MoqtFetchTask> fetch_task); + bool forward() const { return forward_; } + void set_forward(bool forward) { forward_ = forward; } + private: friend class test::MoqtSessionPeer; friend class test::SubscribeRemoteTrackPeer; @@ -180,6 +196,7 @@ std::unique_ptr<MoqtFetchTask> fetch_task_; const uint64_t track_alias_; + bool forward_; Visitor* visitor_; std::optional<bool> is_datagram_; int currently_open_streams_ = 0; @@ -222,7 +239,8 @@ fetch.joining_fetch.has_value() ? SubscribeWindow(Location(0, 0)) : SubscribeWindow(fetch.start_object, fetch.end_group, - fetch.end_object)), + fetch.end_object), + fetch.subscriber_priority), ok_callback_(std::move(callback)) { // Immediately set the data stream type. CheckDataStreamType(MoqtDataStreamType::kStreamHeaderFetch); @@ -235,9 +253,9 @@ // If the UpstreamFetch is destroyed, it will call OnStreamAndFetchClosed // which sets the TaskDestroyedCallback to nullptr. Thus, |callback| can // assume that UpstreamFetch is valid. - UpstreamFetchTask(Location largest_id, absl::Status status, + UpstreamFetchTask(Location largest_location, absl::Status status, TaskDestroyedCallback callback) - : largest_id_(largest_id), + : largest_location_(largest_location), status_(status), task_destroyed_callback_(std::move(callback)), weak_ptr_factory_(this) {} @@ -287,7 +305,7 @@ absl::string_view reason_phrase); private: - Location largest_id_; + Location largest_location_; absl::Status status_; TaskDestroyedCallback task_destroyed_callback_; @@ -314,7 +332,7 @@ }; // Arrival of FETCH_OK/FETCH_ERROR. - void OnFetchResult(Location largest_id, absl::Status status, + void OnFetchResult(Location largest_location, absl::Status status, TaskDestroyedCallback callback); UpstreamFetchTask* task() { return task_.GetIfAvailable(); }
diff --git a/quiche/quic/moqt/moqt_track_test.cc b/quiche/quic/moqt/moqt_track_test.cc index 7bca1db..bc6398d 100644 --- a/quiche/quic/moqt/moqt_track_test.cc +++ b/quiche/quic/moqt/moqt_track_test.cc
@@ -65,7 +65,7 @@ TEST_F(SubscribeRemoteTrackTest, Queries) { EXPECT_EQ(track_.full_track_name(), FullTrackName("foo", "bar")); - EXPECT_EQ(track_.subscribe_id(), 1); + EXPECT_EQ(track_.request_id(), 1); EXPECT_EQ(track_.track_alias(), 2); EXPECT_EQ(track_.visitor(), &visitor_); EXPECT_FALSE(track_.is_fetch()); @@ -101,7 +101,7 @@ }) {} MoqtFetch fetch_message_ = { - /*fetch_id=*/1, + /*request_id=*/1, /*subscriber_priority=*/128, /*group_order=*/std::nullopt, /*joining_fetch=*/std::nullopt, @@ -117,7 +117,7 @@ }; TEST_F(UpstreamFetchTest, Queries) { - EXPECT_EQ(fetch_.subscribe_id(), 1); + EXPECT_EQ(fetch_.request_id(), 1); EXPECT_EQ(fetch_.full_track_name(), FullTrackName("foo", "bar")); EXPECT_FALSE( fetch_.CheckDataStreamType(MoqtDataStreamType::kStreamHeaderSubgroup));
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h index 7131181..b80ed7b 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -60,6 +60,13 @@ SubscribeRemoteTrack::Visitor* visitor, VersionSpecificParameters parameters), (override)); + MOCK_METHOD(bool, SubscribeUpdate, + (const FullTrackName& name, std::optional<Location> start, + std::optional<uint64_t> end_group, + std::optional<MoqtPriority> subscriber_priority, + std::optional<bool> forward, + VersionSpecificParameters parameters), + (override)); MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override)); MOCK_METHOD(bool, Fetch, (const FullTrackName& name, FetchResponseCallback callback,
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 2ef9c8b..cf243c9 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -753,7 +753,7 @@ bool EqualFieldValues(MessageStructuredData& values) const override { auto cast = std::get<MoqtSubscribeUpdate>(values); - if (cast.subscribe_id != subscribe_update_.subscribe_id) { + if (cast.request_id != subscribe_update_.request_id) { QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscribe ID mismatch"; return false; } @@ -769,6 +769,10 @@ QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscriber priority mismatch"; return false; } + if (cast.forward != subscribe_update_.forward) { + QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE forward mismatch"; + return false; + } if (cast.parameters != subscribe_update_.parameters) { QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE parameter mismatch"; return false; @@ -776,25 +780,26 @@ return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv-vv--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv--vv--"); } MessageStructuredData structured_data() const override { return TestMessageBase::MessageStructuredData(subscribe_update_); } private: - uint8_t raw_packet_[12] = { - 0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x05, // start and end sequences - 0xaa, // subscriber_priority + uint8_t raw_packet_[13] = { + 0x02, 0x00, 0x0a, 0x02, 0x03, 0x01, 0x05, // start and end sequences + 0xaa, 0x01, // subscriber_priority, forward 0x01, // 1 parameter 0x02, 0x67, 0x10, // delivery_timeout = 10000 }; MoqtSubscribeUpdate subscribe_update_ = { - /*subscribe_id=*/2, + /*request_id=*/2, /*start=*/Location(3, 1), /*end_group=*/4, /*subscriber_priority=*/0xaa, + /*forward=*/true, VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), quic::QuicTimeDelta::Infinite()), };