Replace MoqT `SUBSCRIBES_BLOCKED` with `REQUESTS_BLOCKED`. PiperOrigin-RevId: 754925435
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc index 7c7f649..bf810f4 100644 --- a/quiche/quic/moqt/moqt_framer.cc +++ b/quiche/quic/moqt/moqt_framer.cc
@@ -735,10 +735,10 @@ WireStringWithVarInt62Length(message.reason_phrase)); } -quiche::QuicheBuffer MoqtFramer::SerializeSubscribesBlocked( - const MoqtSubscribesBlocked& message) { - return SerializeControlMessage(MoqtMessageType::kSubscribesBlocked, - WireVarInt62(message.max_subscribe_id)); +quiche::QuicheBuffer MoqtFramer::SerializeRequestsBlocked( + const MoqtRequestsBlocked& message) { + return SerializeControlMessage(MoqtMessageType::kRequestsBlocked, + WireVarInt62(message.max_request_id)); } quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h index d0c7996..d001464 100644 --- a/quiche/quic/moqt/moqt_framer.h +++ b/quiche/quic/moqt/moqt_framer.h
@@ -69,8 +69,8 @@ quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message); quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message); quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message); - quiche::QuicheBuffer SerializeSubscribesBlocked( - const MoqtSubscribesBlocked& message); + quiche::QuicheBuffer SerializeRequestsBlocked( + const MoqtRequestsBlocked& message); quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message); private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc index 80002a2..21738c9 100644 --- a/quiche/quic/moqt/moqt_framer_test.cc +++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -57,7 +57,7 @@ MoqtMessageType::kFetchCancel, MoqtMessageType::kFetchOk, MoqtMessageType::kFetchError, - MoqtMessageType::kSubscribesBlocked, + MoqtMessageType::kRequestsBlocked, MoqtMessageType::kObjectAck, MoqtMessageType::kClientSetup, MoqtMessageType::kServerSetup, @@ -204,9 +204,9 @@ auto data = std::get<MoqtFetchError>(structured_data); return framer_.SerializeFetchError(data); } - case moqt::MoqtMessageType::kSubscribesBlocked: { - auto data = std::get<MoqtSubscribesBlocked>(structured_data); - return framer_.SerializeSubscribesBlocked(data); + case moqt::MoqtMessageType::kRequestsBlocked: { + auto data = std::get<MoqtRequestsBlocked>(structured_data); + return framer_.SerializeRequestsBlocked(data); } case moqt::MoqtMessageType::kObjectAck: { auto data = std::get<MoqtObjectAck>(structured_data);
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc index e8aed25..904954c 100644 --- a/quiche/quic/moqt/moqt_messages.cc +++ b/quiche/quic/moqt/moqt_messages.cc
@@ -242,8 +242,8 @@ return "FETCH_OK"; case MoqtMessageType::kFetchError: return "FETCH_ERROR"; - case MoqtMessageType::kSubscribesBlocked: - return "SUBSCRIBES_BLOCKED"; + case MoqtMessageType::kRequestsBlocked: + return "REQUESTS_BLOCKED"; case MoqtMessageType::kObjectAck: return "OBJECT_ACK"; }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h index c227832..ac22147 100644 --- a/quiche/quic/moqt/moqt_messages.h +++ b/quiche/quic/moqt/moqt_messages.h
@@ -127,7 +127,7 @@ kFetchCancel = 0x17, kFetchOk = 0x18, kFetchError = 0x19, - kSubscribesBlocked = 0x1a, + kRequestsBlocked = 0x1a, kClientSetup = 0x20, kServerSetup = 0x21, @@ -721,8 +721,8 @@ std::string reason_phrase; }; -struct QUICHE_EXPORT MoqtSubscribesBlocked { - uint64_t max_subscribe_id; +struct QUICHE_EXPORT MoqtRequestsBlocked { + uint64_t max_request_id; }; // All of the four values in this message are encoded as varints.
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc index 2776499..66d52ba 100644 --- a/quiche/quic/moqt/moqt_parser.cc +++ b/quiche/quic/moqt/moqt_parser.cc
@@ -319,8 +319,8 @@ case MoqtMessageType::kFetchError: bytes_read = ProcessFetchError(reader); break; - case MoqtMessageType::kSubscribesBlocked: - bytes_read = ProcessSubscribesBlocked(reader); + case MoqtMessageType::kRequestsBlocked: + bytes_read = ProcessRequestsBlocked(reader); break; case moqt::MoqtMessageType::kObjectAck: bytes_read = ProcessObjectAck(reader); @@ -897,13 +897,12 @@ return reader.PreviouslyReadPayload().length(); } -size_t MoqtControlParser::ProcessSubscribesBlocked( - quic::QuicDataReader& reader) { - MoqtSubscribesBlocked subscribes_blocked; - if (!reader.ReadVarInt62(&subscribes_blocked.max_subscribe_id)) { +size_t MoqtControlParser::ProcessRequestsBlocked(quic::QuicDataReader& reader) { + MoqtRequestsBlocked requests_blocked; + if (!reader.ReadVarInt62(&requests_blocked.max_request_id)) { return 0; } - visitor_.OnSubscribesBlockedMessage(subscribes_blocked); + visitor_.OnRequestsBlockedMessage(requests_blocked); return reader.PreviouslyReadPayload().length(); }
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h index efd90dd..2259f9d 100644 --- a/quiche/quic/moqt/moqt_parser.h +++ b/quiche/quic/moqt/moqt_parser.h
@@ -61,8 +61,7 @@ virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0; virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0; virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0; - virtual void OnSubscribesBlockedMessage( - const MoqtSubscribesBlocked& message) = 0; + virtual void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) = 0; virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0; virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0; @@ -130,7 +129,7 @@ size_t ProcessFetchCancel(quic::QuicDataReader& reader); size_t ProcessFetchOk(quic::QuicDataReader& reader); size_t ProcessFetchError(quic::QuicDataReader& reader); - size_t ProcessSubscribesBlocked(quic::QuicDataReader& reader); + size_t ProcessRequestsBlocked(quic::QuicDataReader& reader); size_t ProcessObjectAck(quic::QuicDataReader& reader); // If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc index d5b189c..eb54620 100644 --- a/quiche/quic/moqt/moqt_parser_test.cc +++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -57,7 +57,7 @@ MoqtMessageType::kFetchCancel, MoqtMessageType::kFetchOk, MoqtMessageType::kFetchError, - MoqtMessageType::kSubscribesBlocked, + MoqtMessageType::kRequestsBlocked, MoqtMessageType::kObjectAck, }; constexpr std::array kDataStreamTypes{ @@ -213,8 +213,7 @@ void OnFetchErrorMessage(const MoqtFetchError& message) override { OnControlMessage(message); } - void OnSubscribesBlockedMessage( - const MoqtSubscribesBlocked& message) override { + void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) override { OnControlMessage(message); } void OnObjectAckMessage(const MoqtObjectAck& message) override {
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc index 25a133b..2d7ab24 100644 --- a/quiche/quic/moqt/moqt_session.cc +++ b/quiche/quic/moqt/moqt_session.cc
@@ -586,13 +586,12 @@ std::optional<uint64_t> provided_track_alias) { // TODO(martinduke): support authorization info if (next_request_id_ >= peer_max_request_id_) { - if (!last_subscribes_blocked_sent_.has_value() || - peer_max_request_id_ > *last_subscribes_blocked_sent_) { - MoqtSubscribesBlocked subscribes_blocked; - subscribes_blocked.max_subscribe_id = peer_max_request_id_; - SendControlMessage( - framer_.SerializeSubscribesBlocked(subscribes_blocked)); - last_subscribes_blocked_sent_ = peer_max_request_id_; + if (!last_requests_blocked_sent_.has_value() || + peer_max_request_id_ > *last_requests_blocked_sent_) { + MoqtRequestsBlocked requests_blocked; + requests_blocked.max_request_id = peer_max_request_id_; + SendControlMessage(framer_.SerializeRequestsBlocked(requests_blocked)); + last_requests_blocked_sent_ = peer_max_request_id_; } QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE with ID " << next_request_id_ @@ -1461,8 +1460,8 @@ session_->upstream_by_id_.erase(message.subscribe_id); } -void MoqtSession::ControlStream::OnSubscribesBlockedMessage( - const MoqtSubscribesBlocked& message) { +void MoqtSession::ControlStream::OnRequestsBlockedMessage( + const MoqtRequestsBlocked& message) { // TODO(martinduke): Derive logic for granting more subscribes. }
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h index 70a7b59..eceaa98 100644 --- a/quiche/quic/moqt/moqt_session.h +++ b/quiche/quic/moqt/moqt_session.h
@@ -245,8 +245,7 @@ void OnFetchCancelMessage(const MoqtFetchCancel& message) override {} void OnFetchOkMessage(const MoqtFetchOk& message) override; void OnFetchErrorMessage(const MoqtFetchError& message) override; - void OnSubscribesBlockedMessage( - const MoqtSubscribesBlocked& message) override; + void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) override; void OnObjectAckMessage(const MoqtObjectAck& message) override { auto subscription_it = session_->published_subscriptions_.find(message.subscribe_id); @@ -687,7 +686,7 @@ uint64_t next_request_id_ = 0; // The local endpoint can send subscribe IDs less than this value. uint64_t peer_max_request_id_ = 0; - std::optional<uint64_t> last_subscribes_blocked_sent_; + std::optional<uint64_t> last_requests_blocked_sent_; // All open incoming subscriptions, indexed by track name, used to check for // duplicates.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc index bdfb51c..74e0f61 100644 --- a/quiche/quic/moqt/moqt_session_test.cc +++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -611,12 +611,12 @@ VersionSpecificParameters())); EXPECT_CALL( mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _)) + Writev(ControlMessageOfType(MoqtMessageType::kRequestsBlocked), _)) .Times(1); EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"), &remote_track_visitor, VersionSpecificParameters())); - // Second time does not send SUBSCRIBES_BLOCKED. + // Second time does not send requests_blocked. EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo2", "bar2"), &remote_track_visitor, VersionSpecificParameters())); @@ -672,7 +672,7 @@ .WillRepeatedly(Return(&mock_stream_)); EXPECT_CALL( mock_stream_, - Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _)); + Writev(ControlMessageOfType(MoqtMessageType::kRequestsBlocked), _)); EXPECT_FALSE(session_.SubscribeCurrentObject(FullTrackName("foo", "bar"), &remote_track_visitor, VersionSpecificParameters()));
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc index 534ddf0..921da7b 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -101,8 +101,8 @@ MoqtMessageType operator()(const MoqtFetchError&) { return MoqtMessageType::kFetchError; } - MoqtMessageType operator()(const MoqtSubscribesBlocked&) { - return MoqtMessageType::kSubscribesBlocked; + MoqtMessageType operator()(const MoqtRequestsBlocked&) { + return MoqtMessageType::kRequestsBlocked; } MoqtMessageType operator()(const MoqtObjectAck&) { return MoqtMessageType::kObjectAck; @@ -185,8 +185,8 @@ quiche::QuicheBuffer operator()(const MoqtFetchError& message) { return framer.SerializeFetchError(message); } - quiche::QuicheBuffer operator()(const MoqtSubscribesBlocked& message) { - return framer.SerializeSubscribesBlocked(message); + quiche::QuicheBuffer operator()(const MoqtRequestsBlocked& message) { + return framer.SerializeRequestsBlocked(message); } quiche::QuicheBuffer operator()(const MoqtObjectAck& message) { return framer.SerializeObjectAck(message); @@ -274,7 +274,7 @@ void OnFetchErrorMessage(const MoqtFetchError& message) { frames_.push_back(message); } - void OnSubscribesBlockedMessage(const MoqtSubscribesBlocked& message) { + void OnRequestsBlockedMessage(const MoqtRequestsBlocked& message) { frames_.push_back(message); } void OnObjectAckMessage(const MoqtObjectAck& message) {
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h index 7b39419..8e704f0 100644 --- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h +++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -33,7 +33,7 @@ MoqtGoAway, MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces, MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, - MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>; + MoqtFetchError, MoqtRequestsBlocked, MoqtObjectAck>; MoqtMessageType MessageTypeForGenericMessage(const MoqtGenericFrame& frame);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h index 3b14ad5..898bfd1 100644 --- a/quiche/quic/moqt/test_tools/moqt_test_message.h +++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -47,7 +47,7 @@ MoqtGoAway, MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces, MoqtMaxRequestId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk, - MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>; + MoqtFetchError, MoqtRequestsBlocked, MoqtObjectAck>; // The total actual size of the message. size_t total_message_size() const { return wire_image_size_; } @@ -1622,14 +1622,14 @@ }; }; -class QUICHE_NO_EXPORT SubscribesBlockedMessage : public TestMessageBase { +class QUICHE_NO_EXPORT RequestsBlockedMessage : public TestMessageBase { public: - SubscribesBlockedMessage() : TestMessageBase() { + RequestsBlockedMessage() : TestMessageBase() { SetWireImage(raw_packet_, sizeof(raw_packet_)); } bool EqualFieldValues(MessageStructuredData& values) const override { - auto cast = std::get<MoqtSubscribesBlocked>(values); - if (cast.max_subscribe_id != subscribes_blocked_.max_subscribe_id) { + auto cast = std::get<MoqtRequestsBlocked>(values); + if (cast.max_request_id != requests_blocked_.max_request_id) { QUIC_LOG(INFO) << "SUBSCRIBES_BLOCKED max_subscribe_id mismatch"; return false; } @@ -1639,17 +1639,17 @@ void ExpandVarints() override { ExpandVarintsImpl("vvv"); } MessageStructuredData structured_data() const override { - return TestMessageBase::MessageStructuredData(subscribes_blocked_); + return TestMessageBase::MessageStructuredData(requests_blocked_); } private: uint8_t raw_packet_[3] = { 0x1a, 0x01, - 0x0b, // max_subscribe_id = 11 + 0x0b, // max_request_id = 11 }; - MoqtSubscribesBlocked subscribes_blocked_ = { - /*max_subscribe_id=*/11, + MoqtRequestsBlocked requests_blocked_ = { + /*max_request_id=*/11, }; }; @@ -1751,8 +1751,8 @@ return std::make_unique<FetchOkMessage>(); case MoqtMessageType::kFetchError: return std::make_unique<FetchErrorMessage>(); - case MoqtMessageType::kSubscribesBlocked: - return std::make_unique<SubscribesBlockedMessage>(); + case MoqtMessageType::kRequestsBlocked: + return std::make_unique<RequestsBlockedMessage>(); case MoqtMessageType::kObjectAck: return std::make_unique<ObjectAckMessage>(); case MoqtMessageType::kClientSetup: