Introduce QuicSession::PerformActionsOnActiveStreams(). This prevents applications from knowing details of all the streams and only perform actions on active ones. Protected by gfe2_reloadable_flag_quic_do_not_use_stream_map PiperOrigin-RevId: 322884736 Change-Id: Idc0478732e2def72627071987ec881e173e8097c
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index d13166e..99bbb0e 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -101,7 +101,9 @@ was_zero_rtt_rejected_(false), fix_gquic_stream_type_(GetQuicReloadableFlag(quic_fix_gquic_stream_type)), remove_streams_waiting_for_acks_( - GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)) { + GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)), + do_not_use_stream_map_( + GetQuicReloadableFlag(quic_do_not_use_stream_map)) { closed_streams_clean_up_alarm_ = QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm( new ClosedStreamsCleanUpDelegate(this))); @@ -376,26 +378,46 @@ GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source); - // Copy all non static streams in a new map for the ease of deleting. - QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams; - for (const auto& it : stream_map_) { - if (!it.second->is_static()) { - non_static_streams[it.first] = it.second.get(); - } - } - for (const auto& it : non_static_streams) { - QuicStreamId id = it.first; - it.second->OnConnectionClosed(frame.quic_error_code, source); - QUIC_RELOADABLE_FLAG_COUNT( - quic_do_not_close_stream_again_on_connection_close); - if (stream_map_.find(id) != stream_map_.end()) { - QUIC_BUG << ENDPOINT << "Stream " << id - << " failed to close under OnConnectionClosed"; - if (!GetQuicReloadableFlag( - quic_do_not_close_stream_again_on_connection_close)) { - CloseStream(id); + if (!do_not_use_stream_map_) { + // Copy all non static streams in a new map for the ease of deleting. + QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams; + for (const auto& it : stream_map_) { + if (!it.second->is_static()) { + non_static_streams[it.first] = it.second.get(); } } + + for (const auto& it : non_static_streams) { + QuicStreamId id = it.first; + it.second->OnConnectionClosed(frame.quic_error_code, source); + QUIC_RELOADABLE_FLAG_COUNT( + quic_do_not_close_stream_again_on_connection_close); + if (stream_map_.find(id) != stream_map_.end()) { + QUIC_BUG << ENDPOINT << "Stream " << id + << " failed to close under OnConnectionClosed"; + if (!GetQuicReloadableFlag( + quic_do_not_close_stream_again_on_connection_close)) { + CloseStream(id); + } + } + } + } else { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_do_not_use_stream_map, 1, 2); + PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) { + QuicStreamId id = stream->id(); + stream->OnConnectionClosed(frame.quic_error_code, source); + QUIC_RELOADABLE_FLAG_COUNT( + quic_do_not_close_stream_again_on_connection_close); + if (stream_map_.find(id) != stream_map_.end()) { + QUIC_BUG << ENDPOINT << "Stream " << id + << " failed to close under OnConnectionClosed"; + if (!GetQuicReloadableFlag( + quic_do_not_close_stream_again_on_connection_close)) { + CloseStream(id); + } + } + return true; + }); } // Cleanup zombie stream map on connection close. @@ -2451,5 +2473,30 @@ GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level); } +void QuicSession::PerformActionOnActiveStreams( + std::function<bool(QuicStream*)> action) { + QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams; + for (const auto& it : stream_map_) { + if (!it.second->is_static()) { + non_static_streams[it.first] = it.second.get(); + } + } + + for (const auto& it : non_static_streams) { + if (!action(it.second)) { + return; + } + } +} + +void QuicSession::PerformActionOnActiveStreams( + std::function<bool(QuicStream*)> action) const { + for (const auto& it : stream_map_) { + if (!it.second->is_static() && !action(it.second.get())) { + return; + } + } +} + #undef ENDPOINT // undef for jumbo builds } // namespace quic
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index 635ffa9..09ce2fd 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -617,6 +617,8 @@ bool was_zero_rtt_rejected() const { return was_zero_rtt_rejected_; } + bool do_not_use_stream_map() const { return do_not_use_stream_map_; } + size_t num_outgoing_draining_streams() const { return num_outgoing_draining_streams_; } @@ -628,6 +630,12 @@ return false; } + // Called by applications to perform |action| on active streams. + // Stream iteration will be stopped if action returns false. + void PerformActionOnActiveStreams(std::function<bool(QuicStream*)> action); + void PerformActionOnActiveStreams( + std::function<bool(QuicStream*)> action) const; + // Return the largest peer created stream id depending on directionality // indicated by |unidirectional|. QuicStreamId GetLargestPeerCreatedStreamId(bool unidirectional) const; @@ -842,6 +850,9 @@ // Latched value of flag quic_remove_streams_waiting_for_acks. const bool remove_streams_waiting_for_acks_; + + // Latched value of flag quic_do_not_use_stream_map. + const bool do_not_use_stream_map_; }; } // namespace quic