Optimize QpackBlockingManager for CPU efficiency.

This CL optimizes QpackBlockingManager for CPU efficiency by:
1) Storing required insert count for each header block.
2) Storing blocked streams in a map.

Protected by FLAGS_quic_reloadable_flag_quic_optimize_qpack_blocking_manager.

PiperOrigin-RevId: 685776335
diff --git a/quiche/common/quiche_feature_flags_list.h b/quiche/common/quiche_feature_flags_list.h
index 2235d21..2f2946f 100755
--- a/quiche/common/quiche_feature_flags_list.h
+++ b/quiche/common/quiche_feature_flags_list.h
@@ -43,6 +43,7 @@
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true, true, "If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close, false, true, "If trrue, early return before write control frame in OnCanWrite() if the connection is already closed.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close2, false, false, "If true, QuicSession will block outgoing control frames when the connection is closed.")
+QUICHE_FLAG(bool, quiche_reloadable_flag_quic_optimize_qpack_blocking_manager, false, false, "If true, optimize qpack_blocking_manager for CPU efficiency.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_pacing_remove_non_initial_burst, false, false, "If true, remove the non-initial burst in QUIC PacingSender.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_parse_cert_compression_algos_from_chlo, true, true, "If true, parse offered cert compression algorithms from received CHLOs.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_priority_respect_incremental, false, false, "If true, respect the incremental parameter of each stream in QuicWriteBlockedList.")
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager.cc b/quiche/quic/core/qpack/qpack_blocking_manager.cc
index 17b6e9f..73589c8 100644
--- a/quiche/quic/core/qpack/qpack_blocking_manager.cc
+++ b/quiche/quic/core/qpack/qpack_blocking_manager.cc
@@ -7,6 +7,9 @@
 #include <limits>
 #include <utility>
 
+#include "quiche/quic/platform/api/quic_flag_utils.h"
+#include "quiche/quic/platform/api/quic_flags.h"
+
 namespace quic {
 
 QpackBlockingManager::QpackBlockingManager() : known_received_count_(0) {}
@@ -19,15 +22,17 @@
 
   QUICHE_DCHECK(!it->second.empty());
 
-  const IndexSet& indices = it->second.front();
-  QUICHE_DCHECK(!indices.empty());
+  const HeaderBlock& header_block = it->second.front();
+  QUICHE_DCHECK(!header_block.indices.empty());
 
-  const uint64_t required_index_count = RequiredInsertCount(indices);
-  if (known_received_count_ < required_index_count) {
-    known_received_count_ = required_index_count;
+  if (known_received_count_ < header_block.required_insert_count) {
+    known_received_count_ = header_block.required_insert_count;
+    if (optimize_qpack_blocking_manager_) {
+      OnKnownReceivedCountIncreased();
+    }
   }
 
-  DecreaseReferenceCounts(indices);
+  DecreaseReferenceCounts(header_block.indices);
 
   it->second.pop_front();
   if (it->second.empty()) {
@@ -43,11 +48,15 @@
     return;
   }
 
-  for (const IndexSet& indices : it->second) {
-    DecreaseReferenceCounts(indices);
+  for (const HeaderBlock& header_block : it->second) {
+    DecreaseReferenceCounts(header_block.indices);
   }
 
   header_blocks_.erase(it);
+  if (optimize_qpack_blocking_manager_) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 1, 5);
+    blocked_streams_.erase(stream_id);
+  }
 }
 
 bool QpackBlockingManager::OnInsertCountIncrement(uint64_t increment) {
@@ -57,19 +66,43 @@
   }
 
   known_received_count_ += increment;
+  if (optimize_qpack_blocking_manager_) {
+    OnKnownReceivedCountIncreased();
+  }
   return true;
 }
 
 void QpackBlockingManager::OnHeaderBlockSent(QuicStreamId stream_id,
-                                             IndexSet indices) {
+                                             IndexSet indices,
+                                             uint64_t required_insert_count) {
   QUICHE_DCHECK(!indices.empty());
 
   IncreaseReferenceCounts(indices);
-  header_blocks_[stream_id].push_back(std::move(indices));
+  header_blocks_[stream_id].push_back(
+      {std::move(indices), required_insert_count});
+  if (optimize_qpack_blocking_manager_ &&
+      required_insert_count > known_received_count_) {
+    auto it = blocked_streams_.find(stream_id);
+    if (it != blocked_streams_.end()) {
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 2, 5);
+      it->second = std::max(it->second, required_insert_count);
+    } else {
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 3, 5);
+      blocked_streams_[stream_id] = required_insert_count;
+    }
+  }
 }
 
 bool QpackBlockingManager::blocking_allowed_on_stream(
     QuicStreamId stream_id, uint64_t maximum_blocked_streams) const {
+  if (optimize_qpack_blocking_manager_) {
+    // Sending blocked reference is allowed if:
+    // 1) Stream |stream_id| is already blocked, or
+    // 2) The number of blocked streams is less than the limit.
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 4, 5);
+    return blocked_streams_.contains(stream_id) ||
+           blocked_streams_.size() < maximum_blocked_streams;
+  }
   // This should be the most common case: the limit is larger than the number of
   // streams that have unacknowledged header blocks (regardless of whether they
   // are blocked or not) plus one for stream |stream_id|.
@@ -84,8 +117,8 @@
 
   uint64_t blocked_stream_count = 0;
   for (const auto& header_blocks_for_stream : header_blocks_) {
-    for (const IndexSet& indices : header_blocks_for_stream.second) {
-      if (RequiredInsertCount(indices) > known_received_count_) {
+    for (const HeaderBlock& header_block : header_blocks_for_stream.second) {
+      if (header_block.required_insert_count > known_received_count_) {
         if (header_blocks_for_stream.first == stream_id) {
           // Sending blocking references is allowed if stream |stream_id| is
           // already blocked.
@@ -155,4 +188,17 @@
   }
 }
 
+void QpackBlockingManager::OnKnownReceivedCountIncreased() {
+  QUICHE_DCHECK(optimize_qpack_blocking_manager_);
+  for (auto blocked_it = blocked_streams_.begin();
+       blocked_it != blocked_streams_.end();) {
+    if (blocked_it->second > known_received_count_) {
+      ++blocked_it;
+      continue;
+    }
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 5, 5);
+    blocked_streams_.erase(blocked_it++);
+  }
+}
+
 }  // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager.h b/quiche/quic/core/qpack/qpack_blocking_manager.h
index bad487e..8e9e316 100644
--- a/quiche/quic/core/qpack/qpack_blocking_manager.h
+++ b/quiche/quic/core/qpack/qpack_blocking_manager.h
@@ -48,7 +48,8 @@
 
   // Called when sending a header block containing references to dynamic table
   // entries with |indices|.  |indices| must not be empty.
-  void OnHeaderBlockSent(QuicStreamId stream_id, IndexSet indices);
+  void OnHeaderBlockSent(QuicStreamId stream_id, IndexSet indices,
+                         uint64_t required_insert_count);
 
   // Returns true if sending blocking references on stream |stream_id| would not
   // increase the total number of blocked streams above
@@ -68,6 +69,8 @@
   uint64_t known_received_count() const { return known_received_count_; }
 
   // Required Insert Count for set of indices.
+  // TODO(fayang): move this method to qpack_encoder once deprecating
+  // optimize_qpack_blocking_manager flag.
   static uint64_t RequiredInsertCount(const IndexSet& indices);
 
  private:
@@ -78,13 +81,20 @@
   // on a single stream, they might not be blocked at the same time. Use
   // std::list instead of quiche::QuicheCircularDeque because it has lower
   // memory footprint when holding few elements.
-  using HeaderBlocksForStream = std::list<IndexSet>;
-  using HeaderBlocks = absl::flat_hash_map<QuicStreamId, HeaderBlocksForStream>;
+  struct HeaderBlock {
+    IndexSet indices;
+    uint64_t required_insert_count = 0;
+  };
+  using HeaderBlocks =
+      absl::flat_hash_map<QuicStreamId, std::list<const HeaderBlock>>;
 
   // Increase or decrease the reference count for each index in |indices|.
   void IncreaseReferenceCounts(const IndexSet& indices);
   void DecreaseReferenceCounts(const IndexSet& indices);
 
+  // Called to cleanup blocked_streams_ when known_received_count is increased.
+  void OnKnownReceivedCountIncreased();
+
   // Multiset of indices in each header block for each stream.
   // Must not contain a stream id with an empty queue.
   HeaderBlocks header_blocks_;
@@ -93,6 +103,13 @@
   absl::btree_map<uint64_t, uint64_t> entry_reference_counts_;
 
   uint64_t known_received_count_;
+
+  // Mapping from blocked streams to their required insert count (>
+  // known_received_count_).
+  absl::flat_hash_map<QuicStreamId, uint64_t> blocked_streams_;
+
+  const bool optimize_qpack_blocking_manager_ =
+      GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager);
 };
 
 }  // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager_test.cc b/quiche/quic/core/qpack/qpack_blocking_manager_test.cc
index 670264e..8196146 100644
--- a/quiche/quic/core/qpack/qpack_blocking_manager_test.cc
+++ b/quiche/quic/core/qpack/qpack_blocking_manager_test.cc
@@ -19,14 +19,22 @@
       if (header_blocks_for_stream.first != stream_id) {
         continue;
       }
-      for (const auto& indices : header_blocks_for_stream.second) {
-        if (QpackBlockingManager::RequiredInsertCount(indices) >
+      for (const auto& header_block : header_blocks_for_stream.second) {
+        QUICHE_DCHECK_EQ(
+            header_block.required_insert_count,
+            QpackBlockingManager::RequiredInsertCount(header_block.indices));
+        if (header_block.required_insert_count >
             manager->known_received_count_) {
+          if (GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager)) {
+            QUICHE_DCHECK(manager->blocked_streams_.contains(stream_id));
+          }
           return true;
         }
       }
     }
-
+    if (GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager)) {
+      QUICHE_DCHECK(!manager->blocked_streams_.contains(stream_id));
+    }
     return false;
   }
 };
@@ -59,12 +67,12 @@
 
   // Stream 0 is not blocked, because it only references entries that are
   // already acknowledged by an Insert Count Increment instruction.
-  manager_.OnHeaderBlockSent(0, {1, 0});
+  manager_.OnHeaderBlockSent(0, {1, 0}, 2);
   EXPECT_FALSE(stream_is_blocked(0));
 }
 
 TEST_F(QpackBlockingManagerTest, UnblockedByInsertCountIncrement) {
-  manager_.OnHeaderBlockSent(0, {1, 0});
+  manager_.OnHeaderBlockSent(0, {1, 0}, 2);
   EXPECT_TRUE(stream_is_blocked(0));
 
   EXPECT_TRUE(manager_.OnInsertCountIncrement(2));
@@ -72,7 +80,7 @@
 }
 
 TEST_F(QpackBlockingManagerTest, NotBlockedByHeaderAcknowledgement) {
-  manager_.OnHeaderBlockSent(0, {2, 1, 1});
+  manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3);
   EXPECT_TRUE(stream_is_blocked(0));
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0));
@@ -80,13 +88,13 @@
 
   // Stream 1 is not blocked, because it only references entries that are
   // already acknowledged by a Header Acknowledgement instruction.
-  manager_.OnHeaderBlockSent(1, {2, 2});
+  manager_.OnHeaderBlockSent(1, {2, 2}, 3);
   EXPECT_FALSE(stream_is_blocked(1));
 }
 
 TEST_F(QpackBlockingManagerTest, UnblockedByHeaderAcknowledgement) {
-  manager_.OnHeaderBlockSent(0, {2, 1, 1});
-  manager_.OnHeaderBlockSent(1, {2, 2});
+  manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3);
+  manager_.OnHeaderBlockSent(1, {2, 2}, 3);
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_TRUE(stream_is_blocked(1));
 
@@ -99,17 +107,17 @@
   EXPECT_EQ(0u, manager_.known_received_count());
 
   // Sending a header block does not change Known Received Count.
-  manager_.OnHeaderBlockSent(0, {0});
+  manager_.OnHeaderBlockSent(0, {0}, 1);
   EXPECT_EQ(0u, manager_.known_received_count());
 
-  manager_.OnHeaderBlockSent(1, {1});
+  manager_.OnHeaderBlockSent(1, {1}, 2);
   EXPECT_EQ(0u, manager_.known_received_count());
 
   // Header Acknowledgement might increase Known Received Count.
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0));
   EXPECT_EQ(1u, manager_.known_received_count());
 
-  manager_.OnHeaderBlockSent(2, {5});
+  manager_.OnHeaderBlockSent(2, {5}, 6);
   EXPECT_EQ(1u, manager_.known_received_count());
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1));
@@ -128,7 +136,7 @@
 
   // Header Acknowledgement of a block with smaller Required Insert Count does
   // not increase Known Received Count.
-  manager_.OnHeaderBlockSent(0, {3});
+  manager_.OnHeaderBlockSent(0, {3}, 4);
   EXPECT_EQ(6u, manager_.known_received_count());
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0));
@@ -136,7 +144,7 @@
 
   // Header Acknowledgement of a block with equal Required Insert Count does not
   // increase Known Received Count.
-  manager_.OnHeaderBlockSent(1, {5});
+  manager_.OnHeaderBlockSent(1, {5}, 6);
   EXPECT_EQ(6u, manager_.known_received_count());
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1));
@@ -147,16 +155,16 @@
   EXPECT_EQ(std::numeric_limits<uint64_t>::max(),
             manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(0, {0});
+  manager_.OnHeaderBlockSent(0, {0}, 1);
   EXPECT_EQ(0u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(1, {2});
+  manager_.OnHeaderBlockSent(1, {2}, 3);
   EXPECT_EQ(0u, manager_.smallest_blocking_index());
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0));
   EXPECT_EQ(2u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(1, {1});
+  manager_.OnHeaderBlockSent(1, {1}, 2);
   EXPECT_EQ(1u, manager_.smallest_blocking_index());
 
   EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1));
@@ -176,12 +184,12 @@
   EXPECT_EQ(std::numeric_limits<uint64_t>::max(),
             manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(0, {2, 1, 1});
+  manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3);
   EXPECT_EQ(0u, manager_.known_received_count());
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_EQ(1u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(0, {1, 0});
+  manager_.OnHeaderBlockSent(0, {1, 0}, 2);
   EXPECT_EQ(0u, manager_.known_received_count());
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_EQ(0u, manager_.smallest_blocking_index());
@@ -191,7 +199,7 @@
   EXPECT_FALSE(stream_is_blocked(0));
   EXPECT_EQ(0u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(0, {3});
+  manager_.OnHeaderBlockSent(0, {3}, 4);
   EXPECT_EQ(3u, manager_.known_received_count());
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_EQ(0u, manager_.smallest_blocking_index());
@@ -211,15 +219,15 @@
 }
 
 TEST_F(QpackBlockingManagerTest, CancelStream) {
-  manager_.OnHeaderBlockSent(0, {3});
+  manager_.OnHeaderBlockSent(0, {3}, 4);
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_EQ(3u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(0, {2});
+  manager_.OnHeaderBlockSent(0, {2}, 3);
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_EQ(2u, manager_.smallest_blocking_index());
 
-  manager_.OnHeaderBlockSent(1, {4});
+  manager_.OnHeaderBlockSent(1, {4}, 5);
   EXPECT_TRUE(stream_is_blocked(0));
   EXPECT_TRUE(stream_is_blocked(1));
   EXPECT_EQ(2u, manager_.smallest_blocking_index());
@@ -250,8 +258,8 @@
   EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId2, 1));
 
   // Doubly block first stream.
-  manager_.OnHeaderBlockSent(kStreamId1, {0});
-  manager_.OnHeaderBlockSent(kStreamId1, {1});
+  manager_.OnHeaderBlockSent(kStreamId1, {0}, 1);
+  manager_.OnHeaderBlockSent(kStreamId1, {1}, 2);
 
   // First stream is already blocked so it can carry more blocking references.
   EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId1, 1));
@@ -263,7 +271,7 @@
   EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId2, 2));
 
   // Block second stream.
-  manager_.OnHeaderBlockSent(kStreamId2, {2});
+  manager_.OnHeaderBlockSent(kStreamId2, {2}, 3);
 
   // Streams are already blocked so either can carry more blocking references.
   EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId1, 2));
diff --git a/quiche/quic/core/qpack/qpack_encoder.cc b/quiche/quic/core/qpack/qpack_encoder.cc
index 7b7bf23..2ed6290 100644
--- a/quiche/quic/core/qpack/qpack_encoder.cc
+++ b/quiche/quic/core/qpack/qpack_encoder.cc
@@ -407,7 +407,8 @@
           ? 0
           : QpackBlockingManager::RequiredInsertCount(referred_indices);
   if (!referred_indices.empty()) {
-    blocking_manager_.OnHeaderBlockSent(stream_id, std::move(referred_indices));
+    blocking_manager_.OnHeaderBlockSent(stream_id, std::move(referred_indices),
+                                        required_insert_count);
   }
 
   // Second pass.