MoQT SUBSCRIBES_BLOCKED frame.

PiperOrigin-RevId: 722646372
diff --git a/quiche/quic/moqt/moqt_framer.cc b/quiche/quic/moqt/moqt_framer.cc
index e06183a..7e86e0d 100644
--- a/quiche/quic/moqt/moqt_framer.cc
+++ b/quiche/quic/moqt/moqt_framer.cc
@@ -680,6 +680,12 @@
       WireStringWithVarInt62Length(message.reason_phrase));
 }
 
+quiche::QuicheBuffer MoqtFramer::SerializeSubscribesBlocked(
+    const MoqtSubscribesBlocked& message) {
+  return SerializeControlMessage(MoqtMessageType::kSubscribesBlocked,
+                                 WireVarInt62(message.max_subscribe_id));
+}
+
 quiche::QuicheBuffer MoqtFramer::SerializeObjectAck(
     const MoqtObjectAck& message) {
   return SerializeControlMessage(
diff --git a/quiche/quic/moqt/moqt_framer.h b/quiche/quic/moqt/moqt_framer.h
index 6969ab4..9601175 100644
--- a/quiche/quic/moqt/moqt_framer.h
+++ b/quiche/quic/moqt/moqt_framer.h
@@ -70,6 +70,8 @@
   quiche::QuicheBuffer SerializeFetchCancel(const MoqtFetchCancel& message);
   quiche::QuicheBuffer SerializeFetchOk(const MoqtFetchOk& message);
   quiche::QuicheBuffer SerializeFetchError(const MoqtFetchError& message);
+  quiche::QuicheBuffer SerializeSubscribesBlocked(
+      const MoqtSubscribesBlocked& message);
   quiche::QuicheBuffer SerializeObjectAck(const MoqtObjectAck& message);
 
  private:
diff --git a/quiche/quic/moqt/moqt_framer_test.cc b/quiche/quic/moqt/moqt_framer_test.cc
index 1a17f77..d55fe4d 100644
--- a/quiche/quic/moqt/moqt_framer_test.cc
+++ b/quiche/quic/moqt/moqt_framer_test.cc
@@ -56,6 +56,7 @@
       MoqtMessageType::kFetchCancel,
       MoqtMessageType::kFetchOk,
       MoqtMessageType::kFetchError,
+      MoqtMessageType::kSubscribesBlocked,
       MoqtMessageType::kObjectAck,
       MoqtMessageType::kClientSetup,
       MoqtMessageType::kServerSetup,
@@ -205,6 +206,10 @@
         auto data = std::get<MoqtFetchError>(structured_data);
         return framer_.SerializeFetchError(data);
       }
+      case moqt::MoqtMessageType::kSubscribesBlocked: {
+        auto data = std::get<MoqtSubscribesBlocked>(structured_data);
+        return framer_.SerializeSubscribesBlocked(data);
+      }
       case moqt::MoqtMessageType::kObjectAck: {
         auto data = std::get<MoqtObjectAck>(structured_data);
         return framer_.SerializeObjectAck(data);
diff --git a/quiche/quic/moqt/moqt_messages.cc b/quiche/quic/moqt/moqt_messages.cc
index 7b535f2..44b37af 100644
--- a/quiche/quic/moqt/moqt_messages.cc
+++ b/quiche/quic/moqt/moqt_messages.cc
@@ -117,6 +117,8 @@
       return "FETCH_OK";
     case MoqtMessageType::kFetchError:
       return "FETCH_ERROR";
+    case MoqtMessageType::kSubscribesBlocked:
+      return "SUBSCRIBES_BLOCKED";
     case MoqtMessageType::kObjectAck:
       return "OBJECT_ACK";
   }
diff --git a/quiche/quic/moqt/moqt_messages.h b/quiche/quic/moqt/moqt_messages.h
index e022e28..d4722fc 100644
--- a/quiche/quic/moqt/moqt_messages.h
+++ b/quiche/quic/moqt/moqt_messages.h
@@ -102,6 +102,7 @@
   kFetchCancel = 0x17,
   kFetchOk = 0x18,
   kFetchError = 0x19,
+  kSubscribesBlocked = 0x1a,
   kClientSetup = 0x40,
   kServerSetup = 0x41,
 
@@ -568,6 +569,10 @@
   std::string reason_phrase;
 };
 
+struct QUICHE_EXPORT MoqtSubscribesBlocked {
+  uint64_t max_subscribe_id;
+};
+
 // All of the four values in this message are encoded as varints.
 // `delta_from_deadline` is encoded as an absolute value, with the lowest bit
 // indicating the sign (0 if positive).
diff --git a/quiche/quic/moqt/moqt_parser.cc b/quiche/quic/moqt/moqt_parser.cc
index d1ab1f2..66a8ad2 100644
--- a/quiche/quic/moqt/moqt_parser.cc
+++ b/quiche/quic/moqt/moqt_parser.cc
@@ -218,6 +218,9 @@
     case MoqtMessageType::kFetchError:
       bytes_read = ProcessFetchError(reader);
       break;
+    case MoqtMessageType::kSubscribesBlocked:
+      bytes_read = ProcessSubscribesBlocked(reader);
+      break;
     case moqt::MoqtMessageType::kObjectAck:
       bytes_read = ProcessObjectAck(reader);
       break;
@@ -796,6 +799,16 @@
   return reader.PreviouslyReadPayload().length();
 }
 
+size_t MoqtControlParser::ProcessSubscribesBlocked(
+    quic::QuicDataReader& reader) {
+  MoqtSubscribesBlocked subscribes_blocked;
+  if (!reader.ReadVarInt62(&subscribes_blocked.max_subscribe_id)) {
+    return 0;
+  }
+  visitor_.OnSubscribesBlockedMessage(subscribes_blocked);
+  return reader.PreviouslyReadPayload().length();
+}
+
 size_t MoqtControlParser::ProcessObjectAck(quic::QuicDataReader& reader) {
   MoqtObjectAck object_ack;
   uint64_t raw_delta;
diff --git a/quiche/quic/moqt/moqt_parser.h b/quiche/quic/moqt/moqt_parser.h
index b05fdcc..591350d 100644
--- a/quiche/quic/moqt/moqt_parser.h
+++ b/quiche/quic/moqt/moqt_parser.h
@@ -62,6 +62,8 @@
   virtual void OnFetchCancelMessage(const MoqtFetchCancel& message) = 0;
   virtual void OnFetchOkMessage(const MoqtFetchOk& message) = 0;
   virtual void OnFetchErrorMessage(const MoqtFetchError& message) = 0;
+  virtual void OnSubscribesBlockedMessage(
+      const MoqtSubscribesBlocked& message) = 0;
   virtual void OnObjectAckMessage(const MoqtObjectAck& message) = 0;
 
   virtual void OnParsingError(MoqtError code, absl::string_view reason) = 0;
@@ -135,6 +137,7 @@
   size_t ProcessFetchCancel(quic::QuicDataReader& reader);
   size_t ProcessFetchOk(quic::QuicDataReader& reader);
   size_t ProcessFetchError(quic::QuicDataReader& reader);
+  size_t ProcessSubscribesBlocked(quic::QuicDataReader& reader);
   size_t ProcessObjectAck(quic::QuicDataReader& reader);
 
   // If |error| is not provided, assumes kProtocolViolation.
diff --git a/quiche/quic/moqt/moqt_parser_test.cc b/quiche/quic/moqt/moqt_parser_test.cc
index dc2b0c2..c7f4fdd 100644
--- a/quiche/quic/moqt/moqt_parser_test.cc
+++ b/quiche/quic/moqt/moqt_parser_test.cc
@@ -57,6 +57,7 @@
     MoqtMessageType::kFetchCancel,
     MoqtMessageType::kFetchOk,
     MoqtMessageType::kFetchError,
+    MoqtMessageType::kSubscribesBlocked,
     MoqtMessageType::kObjectAck,
 };
 constexpr std::array kDataStreamTypes{
@@ -212,6 +213,10 @@
   void OnFetchErrorMessage(const MoqtFetchError& message) override {
     OnControlMessage(message);
   }
+  void OnSubscribesBlockedMessage(
+      const MoqtSubscribesBlocked& message) override {
+    OnControlMessage(message);
+  }
   void OnObjectAckMessage(const MoqtObjectAck& message) override {
     OnControlMessage(message);
   }
diff --git a/quiche/quic/moqt/moqt_session.cc b/quiche/quic/moqt/moqt_session.cc
index 6d5e4d7..91bd0dc 100644
--- a/quiche/quic/moqt/moqt_session.cc
+++ b/quiche/quic/moqt/moqt_session.cc
@@ -524,6 +524,14 @@
                             std::optional<uint64_t> provided_track_alias) {
   // TODO(martinduke): support authorization info
   if (next_subscribe_id_ >= peer_max_subscribe_id_) {
+    if (!last_subscribes_blocked_sent_.has_value() ||
+        peer_max_subscribe_id_ > *last_subscribes_blocked_sent_) {
+      MoqtSubscribesBlocked subscribes_blocked;
+      subscribes_blocked.max_subscribe_id = peer_max_subscribe_id_;
+      SendControlMessage(
+          framer_.SerializeSubscribesBlocked(subscribes_blocked));
+      last_subscribes_blocked_sent_ = peer_max_subscribe_id_;
+    }
     QUIC_DLOG(INFO) << ENDPOINT << "Tried to send SUBSCRIBE with ID "
                     << next_subscribe_id_
                     << " which is greater than the maximum ID "
@@ -1276,6 +1284,11 @@
   session_->upstream_by_id_.erase(message.subscribe_id);
 }
 
+void MoqtSession::ControlStream::OnSubscribesBlockedMessage(
+    const MoqtSubscribesBlocked& message) {
+  // TODO(martinduke): Derive logic for granting more subscribes.
+}
+
 void MoqtSession::ControlStream::OnParsingError(MoqtError error_code,
                                                 absl::string_view reason) {
   session_->Error(error_code, absl::StrCat("Parse error: ", reason));
diff --git a/quiche/quic/moqt/moqt_session.h b/quiche/quic/moqt/moqt_session.h
index be9800c..d185138 100644
--- a/quiche/quic/moqt/moqt_session.h
+++ b/quiche/quic/moqt/moqt_session.h
@@ -275,6 +275,8 @@
     void OnFetchCancelMessage(const MoqtFetchCancel& message) override {}
     void OnFetchOkMessage(const MoqtFetchOk& message) override;
     void OnFetchErrorMessage(const MoqtFetchError& message) override;
+    void OnSubscribesBlockedMessage(
+        const MoqtSubscribesBlocked& message) override;
     void OnObjectAckMessage(const MoqtObjectAck& message) override {
       auto subscription_it =
           session_->published_subscriptions_.find(message.subscribe_id);
@@ -632,6 +634,7 @@
   uint64_t next_subscribe_id_ = 0;
   // The local endpoint can send subscribe IDs less than this value.
   uint64_t peer_max_subscribe_id_ = 0;
+  std::optional<uint64_t> last_subscribes_blocked_sent_;
 
   // All open incoming subscriptions, indexed by track name, used to check for
   // duplicates.
diff --git a/quiche/quic/moqt/moqt_session_test.cc b/quiche/quic/moqt/moqt_session_test.cc
index 5230311..21cb88f 100644
--- a/quiche/quic/moqt/moqt_session_test.cc
+++ b/quiche/quic/moqt/moqt_session_test.cc
@@ -513,11 +513,19 @@
   webtransport::test::MockStream mock_stream;
   std::unique_ptr<MoqtControlParserVisitor> stream_input =
       MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+  EXPECT_CALL(mock_session_, GetStreamById(_))
+      .WillRepeatedly(Return(&mock_stream));
   EXPECT_CALL(mock_stream,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                              &remote_track_visitor));
+  EXPECT_CALL(
+      mock_stream,
+      Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _))
+      .Times(1);
+  EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
+                                              &remote_track_visitor));
+  // Second time does not send SUBSCRIBES_BLOCKED.
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo2", "bar2"),
                                               &remote_track_visitor));
 }
@@ -566,16 +574,21 @@
   MoqtSessionPeer::set_next_subscribe_id(&session_,
                                          kDefaultInitialMaxSubscribeId);
   MockSubscribeRemoteTrackVisitor remote_track_visitor;
+  webtransport::test::MockStream mock_stream;
+  std::unique_ptr<MoqtControlParserVisitor> stream_input =
+      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
+  EXPECT_CALL(mock_session_, GetStreamById(_))
+      .WillRepeatedly(Return(&mock_stream));
+  EXPECT_CALL(
+      mock_stream,
+      Writev(ControlMessageOfType(MoqtMessageType::kSubscribesBlocked), _));
   EXPECT_FALSE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
                                               &remote_track_visitor));
   MoqtMaxSubscribeId max_subscribe_id = {
       /*max_subscribe_id=*/kDefaultInitialMaxSubscribeId + 1,
   };
-  webtransport::test::MockStream mock_stream;
-  std::unique_ptr<MoqtControlParserVisitor> stream_input =
-      MoqtSessionPeer::CreateControlStream(&session_, &mock_stream);
   stream_input->OnMaxSubscribeIdMessage(max_subscribe_id);
-  EXPECT_CALL(mock_session_, GetStreamById(_)).WillOnce(Return(&mock_stream));
+
   EXPECT_CALL(mock_stream,
               Writev(ControlMessageOfType(MoqtMessageType::kSubscribe), _));
   EXPECT_TRUE(session_.SubscribeCurrentGroup(FullTrackName("foo", "bar"),
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
index 8005d4e..e991830 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.cc
@@ -100,6 +100,9 @@
   MoqtMessageType operator()(const MoqtFetchError&) {
     return MoqtMessageType::kFetchError;
   }
+  MoqtMessageType operator()(const MoqtSubscribesBlocked&) {
+    return MoqtMessageType::kSubscribesBlocked;
+  }
   MoqtMessageType operator()(const MoqtObjectAck&) {
     return MoqtMessageType::kObjectAck;
   }
@@ -181,6 +184,9 @@
   quiche::QuicheBuffer operator()(const MoqtFetchError& message) {
     return framer.SerializeFetchError(message);
   }
+  quiche::QuicheBuffer operator()(const MoqtSubscribesBlocked& message) {
+    return framer.SerializeSubscribesBlocked(message);
+  }
   quiche::QuicheBuffer operator()(const MoqtObjectAck& message) {
     return framer.SerializeObjectAck(message);
   }
@@ -267,6 +273,9 @@
   void OnFetchErrorMessage(const MoqtFetchError& message) {
     frames_.push_back(message);
   }
+  void OnSubscribesBlockedMessage(const MoqtSubscribesBlocked& message) {
+    frames_.push_back(message);
+  }
   void OnObjectAckMessage(const MoqtObjectAck& message) {
     frames_.push_back(message);
   }
diff --git a/quiche/quic/moqt/test_tools/moqt_framer_utils.h b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
index efbe5ca..944e50b 100644
--- a/quiche/quic/moqt/test_tools/moqt_framer_utils.h
+++ b/quiche/quic/moqt/test_tools/moqt_framer_utils.h
@@ -24,14 +24,16 @@
 
 // TODO: remove MoqtObject from TestMessageBase::MessageStructuredData and merge
 // those two types.
-using MoqtGenericFrame = absl::variant<
-    MoqtClientSetup, MoqtServerSetup, MoqtSubscribe, MoqtSubscribeOk,
-    MoqtSubscribeError, MoqtUnsubscribe, MoqtSubscribeDone, MoqtSubscribeUpdate,
-    MoqtAnnounce, MoqtAnnounceOk, MoqtAnnounceError, MoqtAnnounceCancel,
-    MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus, MoqtGoAway,
-    MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk,
-    MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId,
-    MoqtFetch, MoqtFetchCancel, MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
+using MoqtGenericFrame =
+    absl::variant<MoqtClientSetup, MoqtServerSetup, MoqtSubscribe,
+                  MoqtSubscribeOk, MoqtSubscribeError, MoqtUnsubscribe,
+                  MoqtSubscribeDone, MoqtSubscribeUpdate, MoqtAnnounce,
+                  MoqtAnnounceOk, MoqtAnnounceError, MoqtAnnounceCancel,
+                  MoqtTrackStatusRequest, MoqtUnannounce, MoqtTrackStatus,
+                  MoqtGoAway, MoqtSubscribeAnnounces, MoqtSubscribeAnnouncesOk,
+                  MoqtSubscribeAnnouncesError, MoqtUnsubscribeAnnounces,
+                  MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel, MoqtFetchOk,
+                  MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>;
 
 MoqtMessageType MessageTypeForGenericMessage(const MoqtGenericFrame& frame);
 
diff --git a/quiche/quic/moqt/test_tools/moqt_test_message.h b/quiche/quic/moqt/test_tools/moqt_test_message.h
index 64732c4..1b8ad42 100644
--- a/quiche/quic/moqt/test_tools/moqt_test_message.h
+++ b/quiche/quic/moqt/test_tools/moqt_test_message.h
@@ -40,7 +40,7 @@
       MoqtTrackStatus, MoqtGoAway, MoqtSubscribeAnnounces,
       MoqtSubscribeAnnouncesOk, MoqtSubscribeAnnouncesError,
       MoqtUnsubscribeAnnounces, MoqtMaxSubscribeId, MoqtFetch, MoqtFetchCancel,
-      MoqtFetchOk, MoqtFetchError, MoqtObjectAck>;
+      MoqtFetchOk, MoqtFetchError, MoqtSubscribesBlocked, MoqtObjectAck>;
 
   // The total actual size of the message.
   size_t total_message_size() const { return wire_image_size_; }
@@ -1491,6 +1491,37 @@
   };
 };
 
+class QUICHE_NO_EXPORT SubscribesBlockedMessage : public TestMessageBase {
+ public:
+  SubscribesBlockedMessage() : TestMessageBase() {
+    SetWireImage(raw_packet_, sizeof(raw_packet_));
+  }
+  bool EqualFieldValues(MessageStructuredData& values) const override {
+    auto cast = std::get<MoqtSubscribesBlocked>(values);
+    if (cast.max_subscribe_id != subscribes_blocked_.max_subscribe_id) {
+      QUIC_LOG(INFO) << "SUBSCRIBES_BLOCKED max_subscribe_id mismatch";
+      return false;
+    }
+    return true;
+  }
+
+  void ExpandVarints() override { ExpandVarintsImpl("vvv"); }
+
+  MessageStructuredData structured_data() const override {
+    return TestMessageBase::MessageStructuredData(subscribes_blocked_);
+  }
+
+ private:
+  uint8_t raw_packet_[3] = {
+      0x1a, 0x01,
+      0x0b,  // max_subscribe_id = 11
+  };
+
+  MoqtSubscribesBlocked subscribes_blocked_ = {
+      /*max_subscribe_id=*/11,
+  };
+};
+
 class QUICHE_NO_EXPORT ObjectAckMessage : public TestMessageBase {
  public:
   ObjectAckMessage() : TestMessageBase() {
@@ -1589,6 +1620,8 @@
       return std::make_unique<FetchOkMessage>();
     case MoqtMessageType::kFetchError:
       return std::make_unique<FetchErrorMessage>();
+    case MoqtMessageType::kSubscribesBlocked:
+      return std::make_unique<SubscribesBlockedMessage>();
     case MoqtMessageType::kObjectAck:
       return std::make_unique<ObjectAckMessage>();
     case MoqtMessageType::kClientSetup: