MoQT SUBSCRIBES_BLOCKED frame.
PiperOrigin-RevId: 722646372
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index e06183a..7e86e0d 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -680,6 +680,12 @@
WireStringWithVarInt62Length(message.reason_phrase));
}
+quiche::QuicheBuffer MoqtFramer::SerializeSubscribesBlocked(
+ const MoqtSubscribesBlocked& message) {
+ return SerializeControlMessage(MoqtMessageType::kSubscribesBlocked,
+ WireVarInt62(message.max_subscribe_id));
+}
+
quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
const MoqtObjectAck& message) {
return SerializeControlMessage(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 6969ab4..9601175 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -70,6 +70,8 @@
quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message);
quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message);
quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message);
+ quiche::QuicheBuffer SerializeSubscribesBlocked(
+ const MoqtSubscribesBlocked& message);
quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 1a17f77..d55fe4d 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -56,6 +56,7 @@
MoqtMessageType::kFetchCancel,
MoqtMessageType::kFetchOk,
MoqtMessageType::kFetchError,
+ MoqtMessageType::kSubscribesBlocked,
MoqtMessageType::kObjectAck,
MoqtMessageType::kClientSetup,
MoqtMessageType::kServerSetup,
@@ -205,6 +206,10 @@
auto data = std::get<MoqtFetchError>(structured_data);
return framer_.SerializeFetchError(data);
}
+ case moqt::MoqtMessageType::kSubscribesBlocked: {
+ auto data = std::get<MoqtSubscribesBlocked>(structured_data);
+ return framer_.SerializeSubscribesBlocked(data);
+ }
case moqt::MoqtMessageType::kObjectAck: {
auto data = std::get<MoqtObjectAck>(structured_data);
return framer_.SerializeObjectAck(data);
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 7b535f2..44b37af 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -117,6 +117,8 @@
return "FETCH_OK";
case MoqtMessageType::kFetchError:
return "FETCH_ERROR";
+ case MoqtMessageType::kSubscribesBlocked:
+ return "SUBSCRIBES_BLOCKED";
case MoqtMessageType::kObjectAck:
return "OBJECT_ACK";
}
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index e022e28..d4722fc 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -102,6 +102,7 @@
kFetchCancel = 0x17,
kFetchOk = 0x18,
kFetchError = 0x19,
+ kSubscribesBlocked = 0x1a,
kClientSetup = 0x40,
kServerSetup = 0x41,
@@ -568,6 +569,10 @@
std::string reason_phrase;
};
+struct QUICHE_EXPORT MoqtSubscribesBlocked {
+ uint64_t max_subscribe_id;
+};
+
// All of the four values in this message are encoded as varints.
// `delta_from_deadline` is encoded as an absolute value, with the lowest bit
// indicating the sign (0 if positive).
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index d1ab1f2..66a8ad2 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -218,6 +218,9 @@
case MoqtMessageType::kFetchError:
bytes_read = ProcessFetchError(reader);
break;
+ case MoqtMessageType::kSubscribesBlocked:
+ bytes_read = ProcessSubscribesBlocked(reader);
+ break;
case moqt::MoqtMessageType::kObjectAck:
bytes_read = ProcessObjectAck(reader);
break;
@@ -796,6 +799,16 @@
return reader.PreviouslyReadPayload().length();
}
+size_t MoqtControlParser::ProcessSubscribesBlocked(
+ quic::QuicDataReader& reader) {
+ MoqtSubscribesBlocked subscribes_blocked;
+ if (!reader.ReadVarInt62(&subscribes_blocked.max_subscribe_id)) {
+ return 0;
+ }
+ visitor_.OnSubscribesBlockedMessage(subscribes_blocked);
+ return reader.PreviouslyReadPayload().length();
+}
+
size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
MoqtObjectAck object_ack;
uint64_t raw_delta;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index b05fdcc..591350d 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -62,6 +62,8 @@
virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
+ virtual void OnSubscribesBlockedMessage(
+ const MoqtSubscribesBlocked& message) = 0;
virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
@@ -135,6 +137,7 @@
size_t ProcessFetchCancel(quic::QuicDataReader& reader);
size_t ProcessFetchOk(quic::QuicDataReader& reader);
size_t ProcessFetchError(quic::QuicDataReader& reader);
+ size_t ProcessSubscribesBlocked(quic::QuicDataReader& reader);
size_t ProcessObjectAck(quic::QuicDataReader& reader);
// If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index dc2b0c2..c7f4fdd 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -57,6 +57,7 @@
MoqtMessageType::kFetchCancel,
MoqtMessageType::kFetchOk,
MoqtMessageType::kFetchError,
+ MoqtMessageType::kSubscribesBlocked,
MoqtMessageType::kObjectAck,
};
constexpr std::array kDataStreamTypes{
@@ -212,6 +213,10 @@
void OnFetchErrorMessage(const MoqtFetchError& message) override {
OnControlMessage(message);
}
+ void OnSubscribesBlockedMessage(
+ const MoqtSubscribesBlocked& message) override {
+ OnControlMessage(message);
+ }
void OnObjectAckMessage(const MoqtObjectAck& message) override {
OnControlMessage(message);
}
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6d5e4d7..91bd0dc 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -524,6 +524,14 @@
std::optional<uint64_t> provided_track_alias) {
// TODO(martinduke): support authorization info
if (next_subscribe_id_ >= peer_max_subscribe_id_) {
+ if (!last_subscribes_blocked_sent_.has_value() ||
+ peer_max_subscribe_id_ > *last_subscribes_blocked_sent_) {
+ MoqtSubscribesBlocked subscribes_blocked;
+ subscribes_blocked.max_subscribe_id = peer_max_subscribe_id_;
+ SendControlMessage(
+ framer_.SerializeSubscribesBlocked(subscribes_blocked));
+ last_subscribes_blocked_sent_ = peer_max_subscribe_id_;
+ }
QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE with ID "
<< next_subscribe_id_
<< " which is greater than the maximum ID "
@@ -1276,6 +1284,11 @@
session_->upstream_by_id_.erase(message.subscribe_id);
}
+void MoqtSession::ControlStream::OnSubscribesBlockedMessage(
+ const MoqtSubscribesBlocked& message) {
+ // TODO(martinduke): Derive logic for granting more subscribes.
+}
+
void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
absl::string_view reason) {
session_->Error(error_code, absl::StrCat("Parse error: ", reason));
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index be9800c..d185138 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -275,6 +275,8 @@
void OnFetchCancelMessage(const MoqtFetchCancel& message) override {}
void OnFetchOkMessage(const MoqtFetchOk& message) override;
void OnFetchErrorMessage(const MoqtFetchError& message) override;
+ void OnSubscribesBlockedMessage(
+ const MoqtSubscribesBlocked& message) override;
void OnObjectAckMessage(const MoqtObjectAck& message) override {
auto subscription_it =
session_->published_subscriptions_.find(message.subscribe_id);
@@ -632,6 +634,7 @@
uint64_t next_subscribe_id_ = 0;
// The local endpoint can send subscribe IDs less than this value.
uint64_t peer_max_subscribe_id_ = 0;
+ std::optional<uint64_t> last_subscribes_blocked_sent_;
// All open incoming subscriptions, indexed by track name, used to check for
// duplicates.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 5230311..21cb88f 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -513,11 +513,19 @@
webtransport::test::MockStream mock_stream;
std::unique_ptr<MoqtControlParserVisitor> stream_input =
MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
- EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+ EXPECT_CALL(mock_session_, GetStreamById(_))
+ .WillRepeatedly(Return(&mock_stream));
EXPECT_CALL(mock_stream,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
&remote_track_visitor));
+ EXPECT_CALL(
+ mock_stream,
+ Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
+ .Times(1);
+ EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
+ &remote_track_visitor));
+ // Second time does not send SUBSCRIBES_BLOCKED.
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
&remote_track_visitor));
}
@@ -566,16 +574,21 @@
MoqtSessionPeer::set_next_subscribe_id(&session_,
kDefaultInitialMaxSubscribeId);
MockSubscribeRemoteTrackVisitor remote_track_visitor;
+ webtransport::test::MockStream mock_stream;
+ std::unique_ptr<MoqtControlParserVisitor> stream_input =
+ MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+ EXPECT_CALL(mock_session_, GetStreamById(_))
+ .WillRepeatedly(Return(&mock_stream));
+ EXPECT_CALL(
+ mock_stream,
+ Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
&remote_track_visitor));
MoqtMaxSubscribeId max_subscribe_id = {
/*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
};
- webtransport::test::MockStream mock_stream;
- std::unique_ptr<MoqtControlParserVisitor> stream_input =
- MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
stream_input->OnMaxSubscribeIdMessage(max_subscribe_id);
- EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+
EXPECT_CALL(mock_stream,
Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
index 8005d4e..e991830 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -100,6 +100,9 @@
MoqtMessageType operator()(const MoqtFetchError&) {
return MoqtMessageType::kFetchError;
}
+ MoqtMessageType operator()(const MoqtSubscribesBlocked&) {
+ return MoqtMessageType::kSubscribesBlocked;
+ }
MoqtMessageType operator()(const MoqtObjectAck&) {
return MoqtMessageType::kObjectAck;
}
@@ -181,6 +184,9 @@
quiche::QuicheBuffer operator()(const MoqtFetchError& message) {
return framer.SerializeFetchError(message);
}
+ quiche::QuicheBuffer operator()(const MoqtSubscribesBlocked& message) {
+ return framer.SerializeSubscribesBlocked(message);
+ }
quiche::QuicheBuffer operator()(const MoqtObjectAck& message) {
return framer.SerializeObjectAck(message);
}
@@ -267,6 +273,9 @@
void OnFetchErrorMessage(const MoqtFetchError& message) {
frames_.push_back(message);
}
+ void OnSubscribesBlockedMessage(const MoqtSubscribesBlocked& message) {
+ frames_.push_back(message);
+ }
void OnObjectAckMessage(const MoqtObjectAck& message) {
frames_.push_back(message);
}
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
index efbe5ca..944e50b 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -24,14 +24,16 @@
// TODO: remove MoqtObject from TestMessageBase::MessageStructuredData and merge
// those two types.
-using MoqtGenericFrame = absl::variant<
- MoqtClientSetup, MoqtServerSetup, MoqtSubscribe, MoqtSubscribeOk,
- MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone, MoqtSubscribeUpdate,
- MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, MoqtAnnounceCancel,
- MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus, MoqtGoAway,
- MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk,
- MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId,
- MoqtFetch, MoqtFetchCancel, MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
+using MoqtGenericFrame =
+ absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtSubscribe,
+ MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe,
+ MoqtSubscribeDone, MoqtSubscribeUpdate, MoqtAnnounce,
+ MoqtAnnounceOk, MoqtAnnounceError, MoqtAnnounceCancel,
+ MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus,
+ MoqtGoAway, MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk,
+ MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces,
+ MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk,
+ MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>;
MoqtMessageType MessageTypeForGenericMessage(const MoqtGenericFrame& frame);
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 64732c4..1b8ad42 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -40,7 +40,7 @@
MoqtTrackStatus, MoqtGoAway, MoqtSubscribeAnnounces,
MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError,
MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel,
- MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
+ MoqtFetchOk, MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>;
// The total actual size of the message.
size_t total_message_size() const { return wire_image_size_; }
@@ -1491,6 +1491,37 @@
};
};
+class QUICHE_NO_EXPORT SubscribesBlockedMessage : public TestMessageBase {
+ public:
+ SubscribesBlockedMessage() : TestMessageBase() {
+ SetWireImage(raw_packet_, sizeof(raw_packet_));
+ }
+ bool EqualFieldValues(MessageStructuredData& values) const override {
+ auto cast = std::get<MoqtSubscribesBlocked>(values);
+ if (cast.max_subscribe_id != subscribes_blocked_.max_subscribe_id) {
+ QUIC_LOG(INFO) << "SUBSCRIBES_BLOCKED max_subscribe_id mismatch";
+ return false;
+ }
+ return true;
+ }
+
+ void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+ MessageStructuredData structured_data() const override {
+ return TestMessageBase::MessageStructuredData(subscribes_blocked_);
+ }
+
+ private:
+ uint8_t raw_packet_[3] = {
+ 0x1a, 0x01,
+ 0x0b, // max_subscribe_id = 11
+ };
+
+ MoqtSubscribesBlocked subscribes_blocked_ = {
+ /*max_subscribe_id=*/11,
+ };
+};
+
class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase {
public:
ObjectAckMessage() : TestMessageBase() {
@@ -1589,6 +1620,8 @@
return std::make_unique<FetchOkMessage>();
case MoqtMessageType::kFetchError:
return std::make_unique<FetchErrorMessage>();
+ case MoqtMessageType::kSubscribesBlocked:
+ return std::make_unique<SubscribesBlockedMessage>();
case MoqtMessageType::kObjectAck:
return std::make_unique<ObjectAckMessage>();
case MoqtMessageType::kClientSetup: