Optimize QpackBlockingManager for CPU efficiency. This CL optimizes QpackBlockingManager for CPU efficiency by: 1) Storing required insert count for each header block. 2) Storing blocked streams in a map. Protected by FLAGS_quic_reloadable_flag_quic_optimize_qpack_blocking_manager. PiperOrigin-RevId: 685776335
diff --git a/quiche/common/quiche_feature_flags_list.h b/quiche/common/quiche_feature_flags_list.h index 2235d21..2f2946f 100755 --- a/quiche/common/quiche_feature_flags_list.h +++ b/quiche/common/quiche_feature_flags_list.h
@@ -43,6 +43,7 @@ QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true, true, "If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close, false, true, "If trrue, early return before write control frame in OnCanWrite() if the connection is already closed.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close2, false, false, "If true, QuicSession will block outgoing control frames when the connection is closed.") +QUICHE_FLAG(bool, quiche_reloadable_flag_quic_optimize_qpack_blocking_manager, false, false, "If true, optimize qpack_blocking_manager for CPU efficiency.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_pacing_remove_non_initial_burst, false, false, "If true, remove the non-initial burst in QUIC PacingSender.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_parse_cert_compression_algos_from_chlo, true, true, "If true, parse offered cert compression algorithms from received CHLOs.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_priority_respect_incremental, false, false, "If true, respect the incremental parameter of each stream in QuicWriteBlockedList.")
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager.cc b/quiche/quic/core/qpack/qpack_blocking_manager.cc index 17b6e9f..73589c8 100644 --- a/quiche/quic/core/qpack/qpack_blocking_manager.cc +++ b/quiche/quic/core/qpack/qpack_blocking_manager.cc
@@ -7,6 +7,9 @@ #include <limits> #include <utility> +#include "quiche/quic/platform/api/quic_flag_utils.h" +#include "quiche/quic/platform/api/quic_flags.h" + namespace quic { QpackBlockingManager::QpackBlockingManager() : known_received_count_(0) {} @@ -19,15 +22,17 @@ QUICHE_DCHECK(!it->second.empty()); - const IndexSet& indices = it->second.front(); - QUICHE_DCHECK(!indices.empty()); + const HeaderBlock& header_block = it->second.front(); + QUICHE_DCHECK(!header_block.indices.empty()); - const uint64_t required_index_count = RequiredInsertCount(indices); - if (known_received_count_ < required_index_count) { - known_received_count_ = required_index_count; + if (known_received_count_ < header_block.required_insert_count) { + known_received_count_ = header_block.required_insert_count; + if (optimize_qpack_blocking_manager_) { + OnKnownReceivedCountIncreased(); + } } - DecreaseReferenceCounts(indices); + DecreaseReferenceCounts(header_block.indices); it->second.pop_front(); if (it->second.empty()) { @@ -43,11 +48,15 @@ return; } - for (const IndexSet& indices : it->second) { - DecreaseReferenceCounts(indices); + for (const HeaderBlock& header_block : it->second) { + DecreaseReferenceCounts(header_block.indices); } header_blocks_.erase(it); + if (optimize_qpack_blocking_manager_) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 1, 5); + blocked_streams_.erase(stream_id); + } } bool QpackBlockingManager::OnInsertCountIncrement(uint64_t increment) { @@ -57,19 +66,43 @@ } known_received_count_ += increment; + if (optimize_qpack_blocking_manager_) { + OnKnownReceivedCountIncreased(); + } return true; } void QpackBlockingManager::OnHeaderBlockSent(QuicStreamId stream_id, - IndexSet indices) { + IndexSet indices, + uint64_t required_insert_count) { QUICHE_DCHECK(!indices.empty()); IncreaseReferenceCounts(indices); - header_blocks_[stream_id].push_back(std::move(indices)); + header_blocks_[stream_id].push_back( + {std::move(indices), required_insert_count}); + if (optimize_qpack_blocking_manager_ && + required_insert_count > known_received_count_) { + auto it = blocked_streams_.find(stream_id); + if (it != blocked_streams_.end()) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 2, 5); + it->second = std::max(it->second, required_insert_count); + } else { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 3, 5); + blocked_streams_[stream_id] = required_insert_count; + } + } } bool QpackBlockingManager::blocking_allowed_on_stream( QuicStreamId stream_id, uint64_t maximum_blocked_streams) const { + if (optimize_qpack_blocking_manager_) { + // Sending blocked reference is allowed if: + // 1) Stream |stream_id| is already blocked, or + // 2) The number of blocked streams is less than the limit. + QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 4, 5); + return blocked_streams_.contains(stream_id) || + blocked_streams_.size() < maximum_blocked_streams; + } // This should be the most common case: the limit is larger than the number of // streams that have unacknowledged header blocks (regardless of whether they // are blocked or not) plus one for stream |stream_id|. @@ -84,8 +117,8 @@ uint64_t blocked_stream_count = 0; for (const auto& header_blocks_for_stream : header_blocks_) { - for (const IndexSet& indices : header_blocks_for_stream.second) { - if (RequiredInsertCount(indices) > known_received_count_) { + for (const HeaderBlock& header_block : header_blocks_for_stream.second) { + if (header_block.required_insert_count > known_received_count_) { if (header_blocks_for_stream.first == stream_id) { // Sending blocking references is allowed if stream |stream_id| is // already blocked. @@ -155,4 +188,17 @@ } } +void QpackBlockingManager::OnKnownReceivedCountIncreased() { + QUICHE_DCHECK(optimize_qpack_blocking_manager_); + for (auto blocked_it = blocked_streams_.begin(); + blocked_it != blocked_streams_.end();) { + if (blocked_it->second > known_received_count_) { + ++blocked_it; + continue; + } + QUIC_RELOADABLE_FLAG_COUNT_N(quic_optimize_qpack_blocking_manager, 5, 5); + blocked_streams_.erase(blocked_it++); + } +} + } // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager.h b/quiche/quic/core/qpack/qpack_blocking_manager.h index bad487e..8e9e316 100644 --- a/quiche/quic/core/qpack/qpack_blocking_manager.h +++ b/quiche/quic/core/qpack/qpack_blocking_manager.h
@@ -48,7 +48,8 @@ // Called when sending a header block containing references to dynamic table // entries with |indices|. |indices| must not be empty. - void OnHeaderBlockSent(QuicStreamId stream_id, IndexSet indices); + void OnHeaderBlockSent(QuicStreamId stream_id, IndexSet indices, + uint64_t required_insert_count); // Returns true if sending blocking references on stream |stream_id| would not // increase the total number of blocked streams above @@ -68,6 +69,8 @@ uint64_t known_received_count() const { return known_received_count_; } // Required Insert Count for set of indices. + // TODO(fayang): move this method to qpack_encoder once deprecating + // optimize_qpack_blocking_manager flag. static uint64_t RequiredInsertCount(const IndexSet& indices); private: @@ -78,13 +81,20 @@ // on a single stream, they might not be blocked at the same time. Use // std::list instead of quiche::QuicheCircularDeque because it has lower // memory footprint when holding few elements. - using HeaderBlocksForStream = std::list<IndexSet>; - using HeaderBlocks = absl::flat_hash_map<QuicStreamId, HeaderBlocksForStream>; + struct HeaderBlock { + IndexSet indices; + uint64_t required_insert_count = 0; + }; + using HeaderBlocks = + absl::flat_hash_map<QuicStreamId, std::list<const HeaderBlock>>; // Increase or decrease the reference count for each index in |indices|. void IncreaseReferenceCounts(const IndexSet& indices); void DecreaseReferenceCounts(const IndexSet& indices); + // Called to cleanup blocked_streams_ when known_received_count is increased. + void OnKnownReceivedCountIncreased(); + // Multiset of indices in each header block for each stream. // Must not contain a stream id with an empty queue. HeaderBlocks header_blocks_; @@ -93,6 +103,13 @@ absl::btree_map<uint64_t, uint64_t> entry_reference_counts_; uint64_t known_received_count_; + + // Mapping from blocked streams to their required insert count (> + // known_received_count_). + absl::flat_hash_map<QuicStreamId, uint64_t> blocked_streams_; + + const bool optimize_qpack_blocking_manager_ = + GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager); }; } // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_blocking_manager_test.cc b/quiche/quic/core/qpack/qpack_blocking_manager_test.cc index 670264e..8196146 100644 --- a/quiche/quic/core/qpack/qpack_blocking_manager_test.cc +++ b/quiche/quic/core/qpack/qpack_blocking_manager_test.cc
@@ -19,14 +19,22 @@ if (header_blocks_for_stream.first != stream_id) { continue; } - for (const auto& indices : header_blocks_for_stream.second) { - if (QpackBlockingManager::RequiredInsertCount(indices) > + for (const auto& header_block : header_blocks_for_stream.second) { + QUICHE_DCHECK_EQ( + header_block.required_insert_count, + QpackBlockingManager::RequiredInsertCount(header_block.indices)); + if (header_block.required_insert_count > manager->known_received_count_) { + if (GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager)) { + QUICHE_DCHECK(manager->blocked_streams_.contains(stream_id)); + } return true; } } } - + if (GetQuicReloadableFlag(quic_optimize_qpack_blocking_manager)) { + QUICHE_DCHECK(!manager->blocked_streams_.contains(stream_id)); + } return false; } }; @@ -59,12 +67,12 @@ // Stream 0 is not blocked, because it only references entries that are // already acknowledged by an Insert Count Increment instruction. - manager_.OnHeaderBlockSent(0, {1, 0}); + manager_.OnHeaderBlockSent(0, {1, 0}, 2); EXPECT_FALSE(stream_is_blocked(0)); } TEST_F(QpackBlockingManagerTest, UnblockedByInsertCountIncrement) { - manager_.OnHeaderBlockSent(0, {1, 0}); + manager_.OnHeaderBlockSent(0, {1, 0}, 2); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_TRUE(manager_.OnInsertCountIncrement(2)); @@ -72,7 +80,7 @@ } TEST_F(QpackBlockingManagerTest, NotBlockedByHeaderAcknowledgement) { - manager_.OnHeaderBlockSent(0, {2, 1, 1}); + manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0)); @@ -80,13 +88,13 @@ // Stream 1 is not blocked, because it only references entries that are // already acknowledged by a Header Acknowledgement instruction. - manager_.OnHeaderBlockSent(1, {2, 2}); + manager_.OnHeaderBlockSent(1, {2, 2}, 3); EXPECT_FALSE(stream_is_blocked(1)); } TEST_F(QpackBlockingManagerTest, UnblockedByHeaderAcknowledgement) { - manager_.OnHeaderBlockSent(0, {2, 1, 1}); - manager_.OnHeaderBlockSent(1, {2, 2}); + manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3); + manager_.OnHeaderBlockSent(1, {2, 2}, 3); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_TRUE(stream_is_blocked(1)); @@ -99,17 +107,17 @@ EXPECT_EQ(0u, manager_.known_received_count()); // Sending a header block does not change Known Received Count. - manager_.OnHeaderBlockSent(0, {0}); + manager_.OnHeaderBlockSent(0, {0}, 1); EXPECT_EQ(0u, manager_.known_received_count()); - manager_.OnHeaderBlockSent(1, {1}); + manager_.OnHeaderBlockSent(1, {1}, 2); EXPECT_EQ(0u, manager_.known_received_count()); // Header Acknowledgement might increase Known Received Count. EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0)); EXPECT_EQ(1u, manager_.known_received_count()); - manager_.OnHeaderBlockSent(2, {5}); + manager_.OnHeaderBlockSent(2, {5}, 6); EXPECT_EQ(1u, manager_.known_received_count()); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1)); @@ -128,7 +136,7 @@ // Header Acknowledgement of a block with smaller Required Insert Count does // not increase Known Received Count. - manager_.OnHeaderBlockSent(0, {3}); + manager_.OnHeaderBlockSent(0, {3}, 4); EXPECT_EQ(6u, manager_.known_received_count()); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0)); @@ -136,7 +144,7 @@ // Header Acknowledgement of a block with equal Required Insert Count does not // increase Known Received Count. - manager_.OnHeaderBlockSent(1, {5}); + manager_.OnHeaderBlockSent(1, {5}, 6); EXPECT_EQ(6u, manager_.known_received_count()); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1)); @@ -147,16 +155,16 @@ EXPECT_EQ(std::numeric_limits<uint64_t>::max(), manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(0, {0}); + manager_.OnHeaderBlockSent(0, {0}, 1); EXPECT_EQ(0u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(1, {2}); + manager_.OnHeaderBlockSent(1, {2}, 3); EXPECT_EQ(0u, manager_.smallest_blocking_index()); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(0)); EXPECT_EQ(2u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(1, {1}); + manager_.OnHeaderBlockSent(1, {1}, 2); EXPECT_EQ(1u, manager_.smallest_blocking_index()); EXPECT_TRUE(manager_.OnHeaderAcknowledgement(1)); @@ -176,12 +184,12 @@ EXPECT_EQ(std::numeric_limits<uint64_t>::max(), manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(0, {2, 1, 1}); + manager_.OnHeaderBlockSent(0, {2, 1, 1}, 3); EXPECT_EQ(0u, manager_.known_received_count()); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_EQ(1u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(0, {1, 0}); + manager_.OnHeaderBlockSent(0, {1, 0}, 2); EXPECT_EQ(0u, manager_.known_received_count()); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_EQ(0u, manager_.smallest_blocking_index()); @@ -191,7 +199,7 @@ EXPECT_FALSE(stream_is_blocked(0)); EXPECT_EQ(0u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(0, {3}); + manager_.OnHeaderBlockSent(0, {3}, 4); EXPECT_EQ(3u, manager_.known_received_count()); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_EQ(0u, manager_.smallest_blocking_index()); @@ -211,15 +219,15 @@ } TEST_F(QpackBlockingManagerTest, CancelStream) { - manager_.OnHeaderBlockSent(0, {3}); + manager_.OnHeaderBlockSent(0, {3}, 4); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_EQ(3u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(0, {2}); + manager_.OnHeaderBlockSent(0, {2}, 3); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_EQ(2u, manager_.smallest_blocking_index()); - manager_.OnHeaderBlockSent(1, {4}); + manager_.OnHeaderBlockSent(1, {4}, 5); EXPECT_TRUE(stream_is_blocked(0)); EXPECT_TRUE(stream_is_blocked(1)); EXPECT_EQ(2u, manager_.smallest_blocking_index()); @@ -250,8 +258,8 @@ EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId2, 1)); // Doubly block first stream. - manager_.OnHeaderBlockSent(kStreamId1, {0}); - manager_.OnHeaderBlockSent(kStreamId1, {1}); + manager_.OnHeaderBlockSent(kStreamId1, {0}, 1); + manager_.OnHeaderBlockSent(kStreamId1, {1}, 2); // First stream is already blocked so it can carry more blocking references. EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId1, 1)); @@ -263,7 +271,7 @@ EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId2, 2)); // Block second stream. - manager_.OnHeaderBlockSent(kStreamId2, {2}); + manager_.OnHeaderBlockSent(kStreamId2, {2}, 3); // Streams are already blocked so either can carry more blocking references. EXPECT_TRUE(manager_.blocking_allowed_on_stream(kStreamId1, 2));
diff --git a/quiche/quic/core/qpack/qpack_encoder.cc b/quiche/quic/core/qpack/qpack_encoder.cc index 7b7bf23..2ed6290 100644 --- a/quiche/quic/core/qpack/qpack_encoder.cc +++ b/quiche/quic/core/qpack/qpack_encoder.cc
@@ -407,7 +407,8 @@ ? 0 : QpackBlockingManager::RequiredInsertCount(referred_indices); if (!referred_indices.empty()) { - blocking_manager_.OnHeaderBlockSent(stream_id, std::move(referred_indices)); + blocking_manager_.OnHeaderBlockSent(stream_id, std::move(referred_indices), + required_insert_count); } // Second pass.