Change MOQT SUBSCRIBE_UPDATE to REQUEST_UPDATE. Part of draft-16. PiperOrigin-RevId: 866694078
diff --git a/quiche/quic/moqt/moqt_bidi_stream.h b/quiche/quic/moqt/moqt_bidi_stream.h index be5451d..d0ded95 100644 --- a/quiche/quic/moqt/moqt_bidi_stream.h +++ b/quiche/quic/moqt/moqt_bidi_stream.h
@@ -87,8 +87,8 @@ virtual void OnPublishDoneMessage(const MoqtPublishDone& message) override { OnParsingError(wrong_message_error_, wrong_message_reason_); } - virtual void OnSubscribeUpdateMessage( - const MoqtSubscribeUpdate& message) override { + virtual void OnRequestUpdateMessage( + const MoqtRequestUpdate& message) override { OnParsingError(wrong_message_error_, wrong_message_reason_); } virtual void OnPublishNamespaceMessage(
diff --git a/quiche/quic/moqt/moqt_bidi_stream_test.cc b/quiche/quic/moqt/moqt_bidi_stream_test.cc index c47a231..cf0cb88 100644 --- a/quiche/quic/moqt/moqt_bidi_stream_test.cc +++ b/quiche/quic/moqt/moqt_bidi_stream_test.cc
@@ -99,7 +99,7 @@ EXPECT_CALL(error_callback_, Call(MoqtError::kProtocolViolation, "Message not allowed for this stream type")); - stream_->OnSubscribeUpdateMessage(MoqtSubscribeUpdate{}); + stream_->OnRequestUpdateMessage(MoqtRequestUpdate{}); stream_ = std::make_unique<MoqtBidiStreamBase>( &framer_, deleted_callback_.AsStdFunction(), error_callback_.AsStdFunction()); @@ -256,12 +256,12 @@ for (int i = 0; i < 100; ++i) { // kMaxPendingMessages = 100. EXPECT_FALSE(stream_->QueueIsFull()); stream_->SendOrBufferMessage( - framer_.SerializeSubscribeUpdate(MoqtSubscribeUpdate{})); + framer_.SerializeRequestUpdate(MoqtRequestUpdate{})); } EXPECT_TRUE(stream_->QueueIsFull()); EXPECT_CALL(error_callback_, Call(MoqtError::kInternalError, _)); stream_->SendOrBufferMessage( - framer_.SerializeSubscribeUpdate(MoqtSubscribeUpdate{})); + framer_.SerializeRequestUpdate(MoqtRequestUpdate{})); EXPECT_TRUE(stream_->QueueIsFull()); }
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 508e72d..59aa538 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -563,20 +563,12 @@ WireStringWithVarInt62Length(message.error_reason)); } -quiche::QuicheBuffer MoqtFramer::SerializeSubscribeUpdate( - const MoqtSubscribeUpdate& message) { - KeyValuePairList parameters; - if (!FillAndValidateVersionSpecificParameters( - MoqtMessageType::kSubscribeUpdate, message.parameters, parameters)) { - return quiche::QuicheBuffer(); - }; - uint64_t end_group = - message.end_group.has_value() ? *message.end_group + 1 : 0; +quiche::QuicheBuffer MoqtFramer::SerializeRequestUpdate( + const MoqtRequestUpdate& message) { return SerializeControlMessage( - MoqtMessageType::kSubscribeUpdate, WireVarInt62(message.request_id), - WireVarInt62(message.start.group), WireVarInt62(message.start.object), - WireVarInt62(end_group), WireUint8(message.subscriber_priority), - WireBoolean(message.forward), WireKeyValuePairList(parameters)); + MoqtMessageType::kRequestUpdate, WireVarInt62(message.request_id), + WireVarInt62(message.existing_request_id), + WireKeyValuePairList(message.parameters.ToKeyValuePairList())); } quiche::QuicheBuffer MoqtFramer::SerializePublishNamespace(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index 9d3e797..b9b5a8e 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -55,8 +55,7 @@ MoqtMessageType message_type = MoqtMessageType::kSubscribeOk); quiche::QuicheBuffer SerializeUnsubscribe(const MoqtUnsubscribe& message); quiche::QuicheBuffer SerializePublishDone(const MoqtPublishDone& message); - quiche::QuicheBuffer SerializeSubscribeUpdate( - const MoqtSubscribeUpdate& message); + quiche::QuicheBuffer SerializeRequestUpdate(const MoqtRequestUpdate& message); quiche::QuicheBuffer SerializePublishNamespace( const MoqtPublishNamespace& message); quiche::QuicheBuffer SerializePublishNamespaceDone(
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index b90cf2a..5a4f4be 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -468,38 +468,6 @@ EXPECT_EQ(static_cast<uint8_t>(buffer.AsSpan()[7]), 0); } -TEST_F(MoqtFramerSimpleTest, SubscribeUpdateEndGroupOnly) { - MoqtSubscribeUpdate subscribe_update = { - /*subscribe_id=*/3, - /*start=*/Location(4, 3), - /*end_group=*/4, - /*subscriber_priority=*/0xaa, - /*forward=*/true, - VersionSpecificParameters(), - }; - quiche::QuicheBuffer buffer; - buffer = framer_.SerializeSubscribeUpdate(subscribe_update); - EXPECT_GT(buffer.size(), 0); - const uint8_t* end_group = BufferAtOffset(buffer, 6); - EXPECT_EQ(*end_group, 5); -} - -TEST_F(MoqtFramerSimpleTest, SubscribeUpdateIncrementsEnd) { - MoqtSubscribeUpdate subscribe_update = { - /*subscribe_id=*/3, - /*start=*/Location(4, 3), - /*end_group=*/4, - /*subscriber_priority=*/0xaa, - /*forward=*/true, - VersionSpecificParameters(), - }; - quiche::QuicheBuffer buffer; - buffer = framer_.SerializeSubscribeUpdate(subscribe_update); - EXPECT_GT(buffer.size(), 0); - const uint8_t* end_group = BufferAtOffset(buffer, 6); - EXPECT_EQ(*end_group, 5); -} - TEST_F(MoqtFramerSimpleTest, RelativeJoiningFetch) { RelativeJoiningFetchMessage message; quiche::QuicheBuffer buffer =
diff --git a/quiche/quic/moqt/moqt_key_value_pair.cc b/quiche/quic/moqt/moqt_key_value_pair.cc index 2293a76..65ddda4 100644 --- a/quiche/quic/moqt/moqt_key_value_pair.cc +++ b/quiche/quic/moqt/moqt_key_value_pair.cc
@@ -69,6 +69,39 @@ type_ = MoqtFilterType::kAbsoluteStart; } +void MessageParameters::Update(const MessageParameters& other) { + if (other.delivery_timeout.has_value()) { + delivery_timeout = other.delivery_timeout; + } + if (!other.authorization_tokens.empty()) { + authorization_tokens = other.authorization_tokens; + } + if (other.expires.has_value()) { + expires = other.expires; + } + if (other.largest_object.has_value()) { + largest_object = other.largest_object; + } + if (other.forward_.has_value()) { + forward_ = other.forward_; + } + if (other.subscriber_priority.has_value()) { + subscriber_priority = other.subscriber_priority; + } + if (other.subscription_filter.has_value()) { + subscription_filter = other.subscription_filter; + } + if (other.group_order.has_value()) { + group_order = other.group_order; + } + if (other.new_group_request.has_value()) { + new_group_request = other.new_group_request; + } + if (other.oack_window_size.has_value()) { + oack_window_size = other.oack_window_size; + } +} + TrackExtensions::TrackExtensions( std::optional<quic::QuicTimeDelta> delivery_timeout, std::optional<quic::QuicTimeDelta> max_cache_duration,
diff --git a/quiche/quic/moqt/moqt_key_value_pair.h b/quiche/quic/moqt/moqt_key_value_pair.h index a7fe5c7..9757151 100644 --- a/quiche/quic/moqt/moqt_key_value_pair.h +++ b/quiche/quic/moqt/moqt_key_value_pair.h
@@ -250,6 +250,10 @@ subscription_filter.emplace(location); } + // If |other| has a value in a particular field, replace the current value + // with it. Otherwise, leave unchanged. + void Update(const MessageParameters& other); + std::optional<quic::QuicTimeDelta> delivery_timeout; std::vector<AuthToken> authorization_tokens; std::optional<quic::QuicTimeDelta> expires;
diff --git a/quiche/quic/moqt/moqt_key_value_pair_test.cc b/quiche/quic/moqt/moqt_key_value_pair_test.cc index a78b70f..135ba0f 100644 --- a/quiche/quic/moqt/moqt_key_value_pair_test.cc +++ b/quiche/quic/moqt/moqt_key_value_pair_test.cc
@@ -229,6 +229,31 @@ } } +TEST_F(MessageParametersTest, Update) { + MessageParameters p1; + p1.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(10); + p1.expires = quic::QuicTimeDelta::FromMilliseconds(100); + p1.set_forward(false); + p1.subscriber_priority = 100; + p1.new_group_request = 1; + MessageParameters p2; + p2.delivery_timeout = quic::QuicTimeDelta::FromMilliseconds(20); + p2.authorization_tokens.push_back( + AuthToken(AuthTokenType::kOutOfBand, "token")); + p2.set_forward(true); + p2.group_order = MoqtDeliveryOrder::kDescending; + p1.Update(p2); + EXPECT_EQ(p1.delivery_timeout, quic::QuicTimeDelta::FromMilliseconds(20)); + EXPECT_EQ(p1.expires, quic::QuicTimeDelta::FromMilliseconds(100)); + ASSERT_EQ(p1.authorization_tokens.size(), 1); + EXPECT_EQ(p1.authorization_tokens[0], + AuthToken(AuthTokenType::kOutOfBand, "token")); + EXPECT_TRUE(p1.forward()); + EXPECT_EQ(p1.subscriber_priority, 100); + EXPECT_EQ(p1.group_order, MoqtDeliveryOrder::kDescending); + EXPECT_EQ(p1.new_group_request, 1); +} + class TrackExtensionsTest : public quic::test::QuicTest {}; TEST_F(TrackExtensionsTest, DefaultConstructor) {
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index ae97049..ecdf54c 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -67,16 +67,16 @@ MoqtMessageType::kServerSetup, MoqtMessageType::kPublish, MoqtMessageType::kSubscribe, - MoqtMessageType::kSubscribeUpdate, + MoqtMessageType::kRequestUpdate, MoqtMessageType::kSubscribeNamespace, MoqtMessageType::kPublishNamespace, MoqtMessageType::kTrackStatus, MoqtMessageType::kFetch}; const std::array<MoqtMessageType, 7> kAllowsDeliveryTimeout = { - MoqtMessageType::kTrackStatus, MoqtMessageType::kRequestOk, - MoqtMessageType::kPublish, MoqtMessageType::kPublishOk, - MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, - MoqtMessageType::kSubscribeUpdate}; + MoqtMessageType::kTrackStatus, MoqtMessageType::kRequestOk, + MoqtMessageType::kPublish, MoqtMessageType::kPublishOk, + MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, + MoqtMessageType::kRequestUpdate}; const std::array<MoqtMessageType, 4> kAllowsMaxCacheDuration = { MoqtMessageType::kSubscribeOk, MoqtMessageType::kRequestOk, MoqtMessageType::kFetchOk, MoqtMessageType::kPublish}; @@ -113,8 +113,8 @@ return "UNSUBSCRIBE"; case MoqtMessageType::kPublishDone: return "PUBLISH_DONE"; - case MoqtMessageType::kSubscribeUpdate: - return "SUBSCRIBE_UPDATE"; + case MoqtMessageType::kRequestUpdate: + return "REQUEST_UPDATE"; case MoqtMessageType::kPublishNamespaceCancel: return "PUBLISH_NAMESPACE_CANCEL"; case MoqtMessageType::kTrackStatus:
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index ece3be9..4f99112 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -243,7 +243,7 @@ }; enum class QUICHE_EXPORT MoqtMessageType : uint64_t { - kSubscribeUpdate = 0x02, + kRequestUpdate = 0x02, kSubscribe = 0x03, kSubscribeOk = 0x04, kRequestError = 0x05, @@ -392,13 +392,10 @@ std::string error_reason; }; -struct QUICHE_EXPORT MoqtSubscribeUpdate { +struct QUICHE_EXPORT MoqtRequestUpdate { uint64_t request_id; - Location start; - std::optional<uint64_t> end_group; - MoqtPriority subscriber_priority; - bool forward; - VersionSpecificParameters parameters; + uint64_t existing_request_id; + MessageParameters parameters; }; struct QUICHE_EXPORT MoqtPublishNamespace {
diff --git a/quiche/quic/moqt/moqt_namespace_stream.h b/quiche/quic/moqt/moqt_namespace_stream.h index 94e9aea..3034cab 100644 --- a/quiche/quic/moqt/moqt_namespace_stream.h +++ b/quiche/quic/moqt/moqt_namespace_stream.h
@@ -124,8 +124,8 @@ void OnSubscribeNamespaceMessage( const MoqtSubscribeNamespace& message) override; // TODO(martinduke): Implement this. - void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate&) override { - QUICHE_DLOG(INFO) << "Got SUBSCRIBE_UPDATE on Namespace stream"; + void OnRequestUpdateMessage(const MoqtRequestUpdate&) override { + QUICHE_DLOG(INFO) << "Got REQUEST_UPDATE on Namespace stream"; } private:
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index d0d2258..0aeb42c 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -590,8 +590,8 @@ case MoqtMessageType::kPublishDone: bytes_read = ProcessPublishDone(reader); break; - case MoqtMessageType::kSubscribeUpdate: - bytes_read = ProcessSubscribeUpdate(reader); + case MoqtMessageType::kRequestUpdate: + bytes_read = ProcessRequestUpdate(reader); break; case MoqtMessageType::kPublishNamespace: bytes_read = ProcessPublishNamespace(reader); @@ -769,40 +769,16 @@ return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessSubscribeUpdate(quic::QuicDataReader& reader) { - MoqtSubscribeUpdate subscribe_update; - uint64_t start_group, start_object, end_group; - uint8_t forward; - if (!reader.ReadVarInt62(&subscribe_update.request_id) || - !reader.ReadVarInt62(&start_group) || - !reader.ReadVarInt62(&start_object) || !reader.ReadVarInt62(&end_group) || - !reader.ReadUInt8(&subscribe_update.subscriber_priority) || - !reader.ReadUInt8(&forward)) { +size_t MoqtControlParser::ProcessRequestUpdate(quic::QuicDataReader& reader) { + MoqtRequestUpdate request_update; + if (!reader.ReadVarInt62(&request_update.request_id) || + !reader.ReadVarInt62(&request_update.existing_request_id)) { return 0; } - KeyValuePairList parameters; - if (!ParseKeyValuePairList(reader, parameters)) { + if (!FillAndValidateMessageParameters(reader, request_update.parameters)) { return 0; } - if (!FillAndValidateVersionSpecificParameters( - parameters, subscribe_update.parameters, - MoqtMessageType::kSubscribeUpdate)) { - return 0; - } - subscribe_update.start = Location(start_group, start_object); - if (end_group > 0) { - subscribe_update.end_group = end_group - 1; - if (subscribe_update.end_group < start_group) { - ParseError("End group is less than start group"); - return 0; - } - } - if (forward > 1) { - ParseError("Invalid forward value in SUBSCRIBE_UPDATE"); - return 0; - } - subscribe_update.forward = (forward == 1); - visitor_.OnSubscribeUpdateMessage(subscribe_update); + visitor_.OnRequestUpdateMessage(request_update); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index a12bb5f..9ba1162 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -44,7 +44,7 @@ virtual void OnSubscribeOkMessage(const MoqtSubscribeOk& message) = 0; virtual void OnUnsubscribeMessage(const MoqtUnsubscribe& message) = 0; virtual void OnPublishDoneMessage(const MoqtPublishDone& message) = 0; - virtual void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) = 0; + virtual void OnRequestUpdateMessage(const MoqtRequestUpdate& message) = 0; virtual void OnPublishNamespaceMessage( const MoqtPublishNamespace& message) = 0; virtual void OnPublishNamespaceDoneMessage( @@ -136,7 +136,7 @@ size_t ProcessSubscribeOk(quic::QuicDataReader& reader); size_t ProcessUnsubscribe(quic::QuicDataReader& reader); size_t ProcessPublishDone(quic::QuicDataReader& reader); - size_t ProcessSubscribeUpdate(quic::QuicDataReader& reader); + size_t ProcessRequestUpdate(quic::QuicDataReader& reader); size_t ProcessPublishNamespace(quic::QuicDataReader& reader); size_t ProcessPublishNamespaceDone(quic::QuicDataReader& reader); size_t ProcessNamespace(quic::QuicDataReader& reader);
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index d422b86..a2049c8 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -38,7 +38,7 @@ MoqtMessageType::kRequestError, MoqtMessageType::kSubscribe, MoqtMessageType::kSubscribeOk, - MoqtMessageType::kSubscribeUpdate, + MoqtMessageType::kRequestUpdate, MoqtMessageType::kUnsubscribe, MoqtMessageType::kPublishDone, MoqtMessageType::kTrackStatus, @@ -1128,34 +1128,18 @@ EXPECT_EQ(visitor_.messages_received_, 1); } -TEST_F(MoqtMessageSpecificTest, SubscribeUpdateExactlyOneObject) { +TEST_F(MoqtMessageSpecificTest, RequestUpdateEndGroupTooLow) { webtransport::test::InMemoryStream stream(/*stream_id=*/0); MoqtControlParser parser(kRawQuic, &stream, visitor_); - char subscribe_update[] = { - 0x02, 0x00, 0x07, 0x02, 0x03, 0x01, 0x04, // start and end sequences - 0x20, 0x01, // priority, forward - 0x00, // No parameters + char request_update[] = { + 0x02, 0x00, 0x09, 0x02, 0x00, // request IDs + 0x01, 0x21, 0x04, 0x04, 0x04, 0x01, 0x03, // filter }; - stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), - false); - parser.ReadAndDispatchMessages(); - EXPECT_EQ(visitor_.messages_received_, 1); -} - -TEST_F(MoqtMessageSpecificTest, SubscribeUpdateEndGroupTooLow) { - webtransport::test::InMemoryStream stream(/*stream_id=*/0); - MoqtControlParser parser(kRawQuic, &stream, visitor_); - char subscribe_update[] = { - 0x02, 0x00, 0x09, 0x02, 0x03, 0x01, 0x03, // start and end sequences - 0x20, 0x01, // priority, forward - 0x01, // 1 parameter - 0x02, 0x20, // delivery_timeout = 32 ms - }; - stream.Receive(absl::string_view(subscribe_update, sizeof(subscribe_update)), + stream.Receive(absl::string_view(request_update, sizeof(request_update)), false); parser.ReadAndDispatchMessages(); EXPECT_EQ(visitor_.messages_received_, 0); - EXPECT_EQ(visitor_.parsing_error_, "End group is less than start group"); + EXPECT_EQ(visitor_.parsing_error_, "Duplicate Message Parameter"); } TEST_F(MoqtMessageSpecificTest, ObjectAckNegativeDelta) {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 32a0c45..bbe5b34 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -211,7 +211,7 @@ message.publisher_priority = track->default_publisher_priority(); } if (!track->InWindow(Location(message.group_id, message.object_id))) { - // TODO(martinduke): a recent SUBSCRIBE_UPDATE could put us here, and it's + // TODO(martinduke): a recent REQUEST_UPDATE could put us here, and it's // not an error. return; } @@ -459,56 +459,36 @@ return true; } -bool MoqtSession::SubscribeUpdate( - const FullTrackName& name, std::optional<Location> start, - std::optional<uint64_t> end_group, - std::optional<MoqtPriority> subscriber_priority, - std::optional<bool> forward, VersionSpecificParameters parameters) { +bool MoqtSession::SubscribeUpdate(const FullTrackName& name, + const MessageParameters& parameters, + MoqtResponseCallback response_callback) { QUICHE_DCHECK(name.IsValid()); + if (next_request_id_ >= peer_max_request_id_) { + if (!last_requests_blocked_sent_.has_value() || + peer_max_request_id_ > *last_requests_blocked_sent_) { + MoqtRequestsBlocked requests_blocked; + requests_blocked.max_request_id = peer_max_request_id_; + SendControlMessage(framer_.SerializeRequestsBlocked(requests_blocked)); + last_requests_blocked_sent_ = peer_max_request_id_; + } + QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE with ID " + << next_request_id_ + << " which is greater than the maximum ID " + << peer_max_request_id_; + return false; + } auto it = subscribe_by_name_.find(name); if (it == subscribe_by_name_.end()) { return false; } - QUICHE_DCHECK(name.IsValid()); - SubscribeRemoteTrack* track = it->second; - MoqtSubscribeUpdate subscribe_update; - subscribe_update.request_id = track->request_id(); - // TODO(martinduke): Temporary code to keep this compiling until it is - // replaced with REQUEST_UPDATE. - std::optional<SubscriptionFilter> filter = - track->parameters().subscription_filter; - Location original_start = - filter.has_value() ? filter->start() : Location(0, 0); - uint64_t original_end = - filter.has_value() ? filter->end_group() : kMaxGroupId; - subscribe_update.start = start.value_or(original_start); - subscribe_update.end_group = end_group.value_or(original_end); - if (subscribe_update.end_group == kMaxGroupId) { - subscribe_update.end_group = std::nullopt; - } - subscribe_update.subscriber_priority = - subscriber_priority.value_or(track->subscriber_priority()); - subscribe_update.forward = forward.value_or(track->forward()); - subscribe_update.parameters = parameters; - if (subscribe_update.start < original_start || - (subscribe_update.end_group.has_value() && - (*subscribe_update.end_group > original_end || - *subscribe_update.end_group < subscribe_update.start.group))) { - // Invalid range. - return false; - } - // Input is valid. Update subscription properties. - if (subscribe_update.end_group.has_value()) { - track->parameters().subscription_filter.emplace( - subscribe_update.start, *subscribe_update.end_group); - } else { - track->parameters().subscription_filter.emplace(subscribe_update.start); - } - track->set_subscriber_priority(subscribe_update.subscriber_priority); - track->set_forward(subscribe_update.forward); - SendControlMessage(framer_.SerializeSubscribeUpdate(subscribe_update)); + pending_subscribe_updates_[next_request_id_] = {name, parameters, + std::move(response_callback)}; + MoqtRequestUpdate update{next_request_id_, it->second->request_id(), + parameters}; + next_request_id_ += 2; + SendControlMessage(framer_.SerializeRequestUpdate(update)); return true; -}; +} void MoqtSession::Unsubscribe(const FullTrackName& name) { if (is_closing_) { @@ -1129,6 +1109,22 @@ "Received REQUEST_OK for SUBSCRIBE, FETCH, or PUBLISH"); return; } + // Response to REQUEST_UPDATE for a subscribe. + auto ru_it = session_->pending_subscribe_updates_.find(message.request_id); + if (ru_it != session_->pending_subscribe_updates_.end()) { + auto sub_it = session_->subscribe_by_name_.find(ru_it->second.name); + if (sub_it == session_->subscribe_by_name_.end()) { + std::move(ru_it->second.response_callback)(MoqtRequestErrorInfo{ + RequestErrorCode::kTrackDoesNotExist, std::nullopt, + "subscription does not exist anymore"}); + session_->pending_subscribe_updates_.erase(ru_it); + return; + } + sub_it->second->parameters().Update(ru_it->second.parameters); + std::move(ru_it->second.response_callback)(std::nullopt); + session_->pending_subscribe_updates_.erase(ru_it); + return; + } // Response to PUBLISH_NAMESPACE. auto pn_it = session_->pending_outgoing_publish_namespaces_.find(message.request_id); @@ -1193,6 +1189,13 @@ } return; } + // Response to REQUEST_UPDATE for a subscribe. + auto ru_it = session_->pending_subscribe_updates_.find(message.request_id); + if (ru_it != session_->pending_subscribe_updates_.end()) { + std::move(ru_it->second.response_callback)(error_info); + session_->pending_subscribe_updates_.erase(ru_it); + return; + } // Response to PUBLISH_NAMESPACE. auto pn_it = session_->pending_outgoing_publish_namespaces_.find(message.request_id); @@ -1241,16 +1244,20 @@ session_->MaybeDestroySubscription(subscribe); } -void MoqtSession::ControlStream::OnSubscribeUpdateMessage( - const MoqtSubscribeUpdate& message) { - auto it = session_->published_subscriptions_.find(message.request_id); - if (it == session_->published_subscriptions_.end()) { +void MoqtSession::ControlStream::OnRequestUpdateMessage( + const MoqtRequestUpdate& message) { + auto it = + session_->published_subscriptions_.find(message.existing_request_id); + if (it != session_->published_subscriptions_.end()) { + // It's updating SUBSCRIBE. + it->second->Update(message.parameters); + SendRequestOk(message.request_id, MessageParameters()); return; } - it->second->Update(message.start, message.end_group, - message.subscriber_priority); - it->second->set_subscriber_delivery_timeout( - message.parameters.delivery_timeout); + // TODO(martinduke): Check all the request types. + // Does not match any known request. + SendRequestError(message.request_id, RequestErrorCode::kNotSupported, + std::nullopt, "No support for update of this type"); } void MoqtSession::ControlStream::OnPublishNamespaceMessage( @@ -1592,7 +1599,7 @@ } Location location(message.group_id, message.object_id); if (!track->InWindow(Location(message.group_id, message.object_id))) { - // This is not an error. It can be the result of a recent SUBSCRIBE_UPDATE. + // This is not an error. It can be the result of a recent REQUEST_UPDATE. return; } if (!track->is_fetch()) { @@ -1824,23 +1831,13 @@ } void MoqtSession::PublishedSubscription::Update( - Location start, std::optional<uint64_t> end_group, - MoqtPriority subscriber_priority) { - // TODO(martinduke): Revise once SUBSCRIBE_UPDATE has become REQUEST_UPDATE. - parameters_.subscriber_priority = subscriber_priority; - if (end_group.has_value()) { - parameters_.subscription_filter.emplace(start, *end_group); - } else { - parameters_.subscription_filter.emplace(start); - } - can_have_joining_fetch_ = false; - // TODO: update priority of all data streams that are currently open. - // TODO: update delivery timeout. - // TODO: update forward and subscribe filter. - - // TODO: reset streams that are no longer in-window. - // TODO: send PUBLISH_DONE if required. - // TODO: send an error for invalid updates now that it's a part of draft-05. + const MessageParameters& parameters) { + // TODO(martinduke): If there are auth tokens, this probably has to go to the + // application. + parameters_.Update(parameters); + can_have_joining_fetch_ = (parameters_.subscription_filter.has_value() && + parameters_.subscription_filter->type() == + MoqtFilterType::kLargestObject); } void MoqtSession::PublishedSubscription::set_subscriber_priority( @@ -2256,7 +2253,7 @@ QUICHE_DCHECK(object->metadata.subgroup == index_.subgroup); if (!subscription.InWindow(object->metadata.location)) { // It is possible that the next object became irrelevant due to a - // SUBSCRIBE_UPDATE. Close the stream if so. + // REQUEST_UPDATE. Close the stream if so. bool success = stream_->SendFin(); QUICHE_BUG_IF(OutgoingDataStream_fin_due_to_update, !success) << "Writing FIN failed despite CanWrite() being true.";
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 69a9a10..a0562de 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -111,11 +111,9 @@ // Returns false if the SUBSCRIBE isn't sent. bool Subscribe(const FullTrackName& name, SubscribeVisitor* visitor, const MessageParameters& parameters) override; - bool SubscribeUpdate(const FullTrackName& name, std::optional<Location> start, - std::optional<uint64_t> end_group, - std::optional<MoqtPriority> subscriber_priority, - std::optional<bool> forward, - VersionSpecificParameters parameters) override; + bool SubscribeUpdate(const FullTrackName& name, + const MessageParameters& parameters, + MoqtResponseCallback response_callback) override; void Unsubscribe(const FullTrackName& name) override; bool Fetch(const FullTrackName& name, FetchResponseCallback callback, Location start, uint64_t end_group, @@ -263,7 +261,7 @@ void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override; void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override; void OnPublishDoneMessage(const MoqtPublishDone& /*message*/) override; - void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override; + void OnRequestUpdateMessage(const MoqtRequestUpdate& message) override; void OnPublishNamespaceMessage( const MoqtPublishNamespace& message) override; void OnPublishNamespaceDoneMessage( @@ -396,8 +394,7 @@ void ProcessObjectAck(const MoqtObjectAck& message); // Updates the window and other properties of the subscription in question. - void Update(Location start, std::optional<uint64_t> end, - MoqtPriority subscriber_priority); + void Update(const MessageParameters& parameters); // Checks if a given Location or Group should be forwarded to the // subscriber. bool InWindow(Location location) { @@ -809,6 +806,15 @@ absl::flat_hash_map<uint64_t, SubscribeRemoteTrack*> subscribe_by_alias_; // All SUBSCRIBEs, indexed by track name. absl::flat_hash_map<FullTrackName, SubscribeRemoteTrack*> subscribe_by_name_; + struct SubscribeUpdateStatus { + FullTrackName name; + MessageParameters parameters; + MoqtResponseCallback response_callback; + }; + // Outgoing Subscribe Updates. We should not update parameters until a + // REQUEST_OK arrives. + absl::flat_hash_map<uint64_t, SubscribeUpdateStatus> + pending_subscribe_updates_; // The next subscribe ID that the local endpoint can send. uint64_t next_request_id_ = 0;
diff --git a/quiche/quic/moqt/moqt_session_interface.h b/quiche/quic/moqt/moqt_session_interface.h index d683665..c69d5c5 100644 --- a/quiche/quic/moqt/moqt_session_interface.h +++ b/quiche/quic/moqt/moqt_session_interface.h
@@ -98,13 +98,11 @@ // Return true if SUBSCRIBE was actually sent. virtual bool Subscribe(const FullTrackName& name, SubscribeVisitor* visitor, const MessageParameters& parameters) = 0; - // If an argument is nullopt, there is no change to the current value. + // If a parameter is nullopt, there is no change to the current value. + // Returns false if the subscription is not found. virtual bool SubscribeUpdate(const FullTrackName& name, - std::optional<Location> start, - std::optional<uint64_t> end_group, - std::optional<MoqtPriority> subscriber_priority, - std::optional<bool> forward, - VersionSpecificParameters parameters) = 0; + const MessageParameters& parameters, + MoqtResponseCallback response_callback) = 0; // Sends an UNSUBSCRIBE message and removes all of the state related to the // subscription. Returns false if the subscription is not found. @@ -164,7 +162,7 @@ // TODO: Add SubscribeNamespace, UnsubscribeNamespace method. // TODO: Add PublishNamespaceCancel method. // TODO: Add TrackStatusRequest method. - // TODO: Add SubscribeUpdate, PublishDone method. + // TODO: Add RequestUpdate, PublishDone method. virtual quiche::QuicheWeakPtr<MoqtSessionInterface> GetWeakPtr() = 0; };
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index 66b18fc..5ebddab 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -827,12 +827,21 @@ }; EXPECT_CALL(remote_track_visitor_, OnReply); stream_input->OnSubscribeOkMessage(ok); - EXPECT_CALL( - mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _)); + EXPECT_CALL(mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kRequestUpdate), _)); + MessageParameters update_parameters; + update_parameters.subscription_filter.emplace(Location(2, 1), 9); + // Set to a non-null value to ensure that the callback is called. + std::optional<MoqtRequestErrorInfo> response = + MoqtRequestErrorInfo{RequestErrorCode::kTimeout, std::nullopt, ""}; EXPECT_TRUE(session_.SubscribeUpdate( - FullTrackName("foo", "bar"), Location(2, 1), 9, std::nullopt, - std::nullopt, VersionSpecificParameters())); + FullTrackName("foo", "bar"), update_parameters, + [&](std::optional<MoqtRequestErrorInfo> info) { response = info; })); + stream_input->OnRequestOkMessage(MoqtRequestOk{ + /*request_id=*/2, + MessageParameters(), + }); + EXPECT_EQ(response, std::nullopt); SubscribeRemoteTrack* track = MoqtSessionPeer::remote_track(&session_, 2); EXPECT_FALSE(track->InWindow(Location(2, 0))); EXPECT_TRUE(track->InWindow(Location(2, 1))); @@ -840,38 +849,11 @@ EXPECT_FALSE(track->InWindow(Location(10, 0))); } -TEST_F(MoqtSessionTest, OutgoingSubscribeUpdateInvalid) { - std::unique_ptr<MoqtControlParserVisitor> stream_input = - MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - EXPECT_CALL(mock_session_, GetStreamById) - .WillRepeatedly(Return(&mock_stream_)); - EXPECT_CALL(mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _)); - MessageParameters parameters(SubscribeForTest()); - parameters.subscription_filter.emplace(Location(1, 0), 10); - session_.Subscribe(FullTrackName("foo", "bar"), &remote_track_visitor_, - parameters); - MoqtSubscribeOk ok = { - /*request_id=*/0, - /*track_alias=*/2, - MessageParameters(), - TrackExtensions(), - }; - EXPECT_CALL(remote_track_visitor_, OnReply); - stream_input->OnSubscribeOkMessage(ok); - EXPECT_CALL( - mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kSubscribeUpdate), _)) - .Times(0); +TEST_F(MoqtSessionTest, OutgoingRequestUpdateInvalid) { + // Wrong track name. EXPECT_FALSE(session_.SubscribeUpdate( - FullTrackName("foo", "bar"), Location(0, 0), 10, std::nullopt, - std::nullopt, VersionSpecificParameters())); - EXPECT_FALSE(session_.SubscribeUpdate( - FullTrackName("foo", "bar"), Location(1, 0), 11, std::nullopt, - std::nullopt, VersionSpecificParameters())); - EXPECT_FALSE(session_.SubscribeUpdate( - FullTrackName("foo", "bar"), Location(7, 0), 6, std::nullopt, - std::nullopt, VersionSpecificParameters())); + FullTrackName("foo", "bar"), MessageParameters(), + +[](std::optional<MoqtRequestErrorInfo>) {})); } TEST_F(MoqtSessionTest, MaxRequestIdChangesResponse) { @@ -4230,42 +4212,47 @@ control_stream->OnNamespaceDoneMessage(MoqtNamespaceDone()); } -// TODO: re-enable this test once this behavior is re-implemented. -#if 0 -TEST_F(MoqtSessionTest, SubscribeUpdateClosesSubscription) { +TEST_F(MoqtSessionTest, IncomingRequestUpdateTruncatesSubscription) { FullTrackName ftn("foo", "bar"); - MockLocalTrackVisitor track_visitor; - session_.AddLocalTrack(ftn, MoqtForwardingPreference::kSubgroup, - &track_visitor); - MoqtSessionPeer::AddSubscription(&session_, ftn, 0, 2, 5, 0); - // Get the window, set the maximum delivered. - LocalTrack* track = MoqtSessionPeer::local_track(&session_, ftn); - track->GetWindow(0)->OnObjectSent(Location(7, 3), - MoqtObjectStatus::kNormal); - // Update the end to fall at the last delivered object. - MoqtSubscribeUpdate update = { - /*request_id=*/0, - /*start_group=*/5, - /*start_object=*/0, - /*end_group=*/7, - }; - std::unique_ptr<MoqtParserVisitor> stream_input = + std::shared_ptr<MoqtTrackPublisher> publisher = + SetupPublisher(ftn, MoqtForwardingPreference::kDatagram, Location(8, 0)); + MockTrackPublisher* mock_publisher = + absl::down_cast<MockTrackPublisher*>(publisher.get()); + MoqtObjectListener* listener = + MoqtSessionPeer::AddSubscription(&session_, publisher, /*request_id=*/1, + /*track_alias=*/2, /*start_group=*/4, + /*start_object=*/0); + + // Send a datagram in window. + EXPECT_CALL(*mock_publisher, GetCachedObject(8, 0, 0)).WillOnce([&] { + return PublishedObject{ + PublishedObjectMetadata{Location(8, 0), 0, "extensions", + MoqtObjectStatus::kNormal, 128, + MoqtForwardingPreference::kDatagram, + MoqtSessionPeer::Now(&session_)}, + quiche::QuicheMemSlice::Copy("deadbeef"), false}; + }); + EXPECT_CALL(mock_session_, SendOrQueueDatagram) + .WillOnce(Return(webtransport::DatagramStatus( + webtransport::DatagramStatusCode::kSuccess, ""))); + + listener->OnNewObjectAvailable(Location(8, 0), 0, 0x80, + MoqtForwardingPreference::kDatagram); + + std::unique_ptr<MoqtControlParserVisitor> control_stream = MoqtSessionPeer::CreateControlStream(&session_, &mock_stream_); - EXPECT_CALL(mock_session_, GetStreamById(4)).WillOnce(Return(&mock_stream_)); - bool correct_message = false; - EXPECT_CALL(mock_stream_, Writev(_, _)) - .WillOnce([&](absl::Span<quiche::QuicheMemSlice> data, - const quiche::StreamWriteOptions& options) { - correct_message = true; - EXPECT_EQ(*ExtractMessageType(data[0].AsStringView()), - MoqtMessageType::kPublishDone); - return absl::OkStatus(); - }); - stream_input->OnSubscribeUpdateMessage(update); - EXPECT_TRUE(correct_message); - EXPECT_FALSE(session_.HasSubscribers(ftn)); + // Update the filter to exclude the live edge. The next object is out of + // window. + MessageParameters parameters; + parameters.subscription_filter.emplace(Location(4, 0), 7); + EXPECT_CALL(mock_stream_, + Writev(ControlMessageOfType(MoqtMessageType::kRequestOk), _)); + control_stream->OnRequestUpdateMessage(MoqtRequestUpdate{3, 1, parameters}); + EXPECT_CALL(*mock_publisher, GetCachedObject).Times(0); + EXPECT_CALL(mock_session_, SendOrQueueDatagram).Times(0); + listener->OnNewObjectAvailable(Location(8, 1), 0, 0x80, + MoqtForwardingPreference::kDatagram); } -#endif } // namespace test
diff --git a/quiche/quic/moqt/moqt_subscribe_windows.h b/quiche/quic/moqt/moqt_subscribe_windows.h index ce37936..431e9f1 100644 --- a/quiche/quic/moqt/moqt_subscribe_windows.h +++ b/quiche/quic/moqt/moqt_subscribe_windows.h
@@ -40,9 +40,9 @@ // Updates the subscription window. Returns true if the update is valid (in // MoQT, subscription windows are only allowed to shrink, not to expand). - // Called only as a result of SUBSCRIBE_OK (largest_id) or SUBSCRIBE_UPDATE. + // Called only as a result of SUBSCRIBE_OK (largest_id) or REQUEST_UPDATE. bool TruncateStart(Location start); - // Called only as a result of SUBSCRIBE_UPDATE. + // Called only as a result of REQUEST_UPDATE. bool TruncateEnd(uint64_t end_group); // Called only as a result of FETCH_OK (largest_id) bool TruncateEnd(Location largest_id);
diff --git a/quiche/quic/moqt/test_tools/mock_moqt_session.h b/quiche/quic/moqt/test_tools/mock_moqt_session.h index 3ba898a..a106fa7 100644 --- a/quiche/quic/moqt/test_tools/mock_moqt_session.h +++ b/quiche/quic/moqt/test_tools/mock_moqt_session.h
@@ -34,11 +34,8 @@ const MessageParameters& parameters), (override)); MOCK_METHOD(bool, SubscribeUpdate, - (const FullTrackName& name, std::optional<Location> start, - std::optional<uint64_t> end_group, - std::optional<MoqtPriority> subscriber_priority, - std::optional<bool> forward, - VersionSpecificParameters parameters), + (const FullTrackName&, const MessageParameters&, + MoqtResponseCallback), (override)); MOCK_METHOD(void, Unsubscribe, (const FullTrackName& name), (override)); MOCK_METHOD(bool, Fetch,
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index a16e56b..5858001 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -49,8 +49,8 @@ quiche::QuicheBuffer operator()(const MoqtPublishDone& message) { return framer.SerializePublishDone(message); } - quiche::QuicheBuffer operator()(const MoqtSubscribeUpdate& message) { - return framer.SerializeSubscribeUpdate(message); + quiche::QuicheBuffer operator()(const MoqtRequestUpdate& message) { + return framer.SerializeRequestUpdate(message); } quiche::QuicheBuffer operator()(const MoqtPublishNamespace& message) { return framer.SerializePublishNamespace(message); @@ -134,7 +134,7 @@ void OnPublishDoneMessage(const MoqtPublishDone& message) { frames_.push_back(message); } - void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) { + void OnRequestUpdateMessage(const MoqtRequestUpdate& message) { frames_.push_back(message); } void OnPublishNamespaceMessage(const MoqtPublishNamespace& message) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index d2007a7..cd5135d 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -28,7 +28,7 @@ using MoqtGenericFrame = std::variant<MoqtClientSetup, MoqtServerSetup, MoqtRequestOk, MoqtRequestError, MoqtSubscribe, MoqtSubscribeOk, - MoqtUnsubscribe, MoqtPublishDone, MoqtSubscribeUpdate, + MoqtUnsubscribe, MoqtPublishDone, MoqtRequestUpdate, MoqtPublishNamespace, MoqtPublishNamespaceDone, MoqtNamespace, MoqtNamespaceDone, MoqtPublishNamespaceCancel, MoqtTrackStatus, MoqtGoAway, MoqtSubscribeNamespace, MoqtMaxRequestId,
diff --git a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h index 68f16b1..10451df 100644 --- a/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h +++ b/quiche/quic/moqt/test_tools/moqt_parser_test_visitor.h
@@ -61,7 +61,7 @@ void OnSubscribeOkMessage(const MoqtSubscribeOk& message) override { OnControlMessage(message); } - void OnSubscribeUpdateMessage(const MoqtSubscribeUpdate& message) override { + void OnRequestUpdateMessage(const MoqtRequestUpdate& message) override { OnControlMessage(message); } void OnUnsubscribeMessage(const MoqtUnsubscribe& message) override {
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 051b99a..947be1b 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -93,7 +93,7 @@ using MessageStructuredData = std::variant< MoqtClientSetup, MoqtServerSetup, MoqtObject, MoqtRequestOk, MoqtRequestError, MoqtSubscribe, MoqtSubscribeOk, MoqtUnsubscribe, - MoqtPublishDone, MoqtSubscribeUpdate, MoqtPublishNamespace, + MoqtPublishDone, MoqtRequestUpdate, MoqtPublishNamespace, MoqtPublishNamespaceDone, MoqtPublishNamespaceCancel, MoqtTrackStatus, MoqtGoAway, MoqtSubscribeNamespace, MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, MoqtRequestsBlocked, MoqtPublish, @@ -870,63 +870,54 @@ }; }; -class QUICHE_NO_EXPORT SubscribeUpdateMessage : public TestMessageBase { +class QUICHE_NO_EXPORT RequestUpdateMessage : public TestMessageBase { public: - SubscribeUpdateMessage() : TestMessageBase() { + RequestUpdateMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); + request_update_.parameters.delivery_timeout = + quic::QuicTimeDelta::FromMilliseconds(10000); + request_update_.parameters.set_forward(true); + request_update_.parameters.subscriber_priority = 0xaa; + request_update_.parameters.subscription_filter.emplace(Location(3, 1), 5); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtSubscribeUpdate>(values); - if (cast.request_id != subscribe_update_.request_id) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscribe ID mismatch"; + auto cast = std::get<MoqtRequestUpdate>(values); + if (cast.request_id != request_update_.request_id) { + QUIC_LOG(INFO) << "REQUEST_UPDATE request ID mismatch"; return false; } - if (cast.start != subscribe_update_.start) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE start group mismatch"; + if (cast.existing_request_id != request_update_.existing_request_id) { + QUIC_LOG(INFO) << "REQUEST_UPDATE existing request ID mismatch"; return false; } - if (cast.end_group != subscribe_update_.end_group) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE end group mismatch"; - return false; - } - if (cast.subscriber_priority != subscribe_update_.subscriber_priority) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE subscriber priority mismatch"; - return false; - } - if (cast.forward != subscribe_update_.forward) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE forward mismatch"; - return false; - } - if (cast.parameters != subscribe_update_.parameters) { - QUIC_LOG(INFO) << "SUBSCRIBE_UPDATE parameter mismatch"; + if (cast.parameters != request_update_.parameters) { + QUIC_LOG(INFO) << "REQUEST_UPDATE parameter mismatch"; return false; } return true; } - void ExpandVarints() override { ExpandVarintsImpl("vvvv--vv--"); } + void ExpandVarints() override { ExpandVarintsImpl("vvvv--vvv--vv----"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(subscribe_update_); + return TestMessageBase::MessageStructuredData(request_update_); } private: - uint8_t raw_packet_[13] = { - 0x02, 0x00, 0x0a, 0x02, 0x03, 0x01, 0x05, // start and end sequences - 0xaa, 0x01, // subscriber_priority, forward - 0x01, // 1 parameter - 0x02, 0x67, 0x10, // delivery_timeout = 10000 + uint8_t raw_packet_[20] = { + 0x02, 0x00, 0x11, 0x02, 0x00, // request IDs 2 and 0 + 0x04, // Four parameters + 0x02, 0x67, 0x10, // delivery_timeout = 10000 + 0x0e, 0x01, // forward = true + 0x10, 0x40, 0xaa, // subscriber_priority = 0xaa + 0x01, 0x04, 0x04, 0x03, 0x01, 0x05, // Absolute Range: (3, 1) to 5. }; - MoqtSubscribeUpdate subscribe_update_ = { + MoqtRequestUpdate request_update_ = { /*request_id=*/2, - /*start=*/Location(3, 1), - /*end_group=*/4, - /*subscriber_priority=*/0xaa, - /*forward=*/true, - VersionSpecificParameters(quic::QuicTimeDelta::FromMilliseconds(10000), - quic::QuicTimeDelta::Infinite()), + /*existing_request_id=*/0, + MessageParameters(), // Set in the constructor. }; }; @@ -1829,8 +1820,8 @@ return std::make_unique<UnsubscribeMessage>(); case MoqtMessageType::kPublishDone: return std::make_unique<PublishDoneMessage>(); - case MoqtMessageType::kSubscribeUpdate: - return std::make_unique<SubscribeUpdateMessage>(); + case MoqtMessageType::kRequestUpdate: + return std::make_unique<RequestUpdateMessage>(); case MoqtMessageType::kPublishNamespace: return std::make_unique<PublishNamespaceMessage>(); case MoqtMessageType::kPublishNamespaceDone: