Introduce QuicSession::PerformActionsOnActiveStreams().
This prevents applications from knowing details of all the streams and only perform actions on active ones.
Protected by gfe2_reloadable_flag_quic_do_not_use_stream_map
PiperOrigin-RevId: 322884736
Change-Id: Idc0478732e2def72627071987ec881e173e8097c
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index d13166e..99bbb0e 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -101,7 +101,9 @@
was_zero_rtt_rejected_(false),
fix_gquic_stream_type_(GetQuicReloadableFlag(quic_fix_gquic_stream_type)),
remove_streams_waiting_for_acks_(
- GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)) {
+ GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
+ do_not_use_stream_map_(
+ GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
closed_streams_clean_up_alarm_ =
QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
new ClosedStreamsCleanUpDelegate(this)));
@@ -376,26 +378,46 @@
GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
- // Copy all non static streams in a new map for the ease of deleting.
- QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
- for (const auto& it : stream_map_) {
- if (!it.second->is_static()) {
- non_static_streams[it.first] = it.second.get();
- }
- }
- for (const auto& it : non_static_streams) {
- QuicStreamId id = it.first;
- it.second->OnConnectionClosed(frame.quic_error_code, source);
- QUIC_RELOADABLE_FLAG_COUNT(
- quic_do_not_close_stream_again_on_connection_close);
- if (stream_map_.find(id) != stream_map_.end()) {
- QUIC_BUG << ENDPOINT << "Stream " << id
- << " failed to close under OnConnectionClosed";
- if (!GetQuicReloadableFlag(
- quic_do_not_close_stream_again_on_connection_close)) {
- CloseStream(id);
+ if (!do_not_use_stream_map_) {
+ // Copy all non static streams in a new map for the ease of deleting.
+ QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+ for (const auto& it : stream_map_) {
+ if (!it.second->is_static()) {
+ non_static_streams[it.first] = it.second.get();
}
}
+
+ for (const auto& it : non_static_streams) {
+ QuicStreamId id = it.first;
+ it.second->OnConnectionClosed(frame.quic_error_code, source);
+ QUIC_RELOADABLE_FLAG_COUNT(
+ quic_do_not_close_stream_again_on_connection_close);
+ if (stream_map_.find(id) != stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ if (!GetQuicReloadableFlag(
+ quic_do_not_close_stream_again_on_connection_close)) {
+ CloseStream(id);
+ }
+ }
+ }
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_do_not_use_stream_map, 1, 2);
+ PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
+ QuicStreamId id = stream->id();
+ stream->OnConnectionClosed(frame.quic_error_code, source);
+ QUIC_RELOADABLE_FLAG_COUNT(
+ quic_do_not_close_stream_again_on_connection_close);
+ if (stream_map_.find(id) != stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream " << id
+ << " failed to close under OnConnectionClosed";
+ if (!GetQuicReloadableFlag(
+ quic_do_not_close_stream_again_on_connection_close)) {
+ CloseStream(id);
+ }
+ }
+ return true;
+ });
}
// Cleanup zombie stream map on connection close.
@@ -2451,5 +2473,30 @@
GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
}
+void QuicSession::PerformActionOnActiveStreams(
+ std::function<bool(QuicStream*)> action) {
+ QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+ for (const auto& it : stream_map_) {
+ if (!it.second->is_static()) {
+ non_static_streams[it.first] = it.second.get();
+ }
+ }
+
+ for (const auto& it : non_static_streams) {
+ if (!action(it.second)) {
+ return;
+ }
+ }
+}
+
+void QuicSession::PerformActionOnActiveStreams(
+ std::function<bool(QuicStream*)> action) const {
+ for (const auto& it : stream_map_) {
+ if (!it.second->is_static() && !action(it.second.get())) {
+ return;
+ }
+ }
+}
+
#undef ENDPOINT // undef for jumbo builds
} // namespace quic
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 635ffa9..09ce2fd 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -617,6 +617,8 @@
bool was_zero_rtt_rejected() const { return was_zero_rtt_rejected_; }
+ bool do_not_use_stream_map() const { return do_not_use_stream_map_; }
+
size_t num_outgoing_draining_streams() const {
return num_outgoing_draining_streams_;
}
@@ -628,6 +630,12 @@
return false;
}
+ // Called by applications to perform |action| on active streams.
+ // Stream iteration will be stopped if action returns false.
+ void PerformActionOnActiveStreams(std::function<bool(QuicStream*)> action);
+ void PerformActionOnActiveStreams(
+ std::function<bool(QuicStream*)> action) const;
+
// Return the largest peer created stream id depending on directionality
// indicated by |unidirectional|.
QuicStreamId GetLargestPeerCreatedStreamId(bool unidirectional) const;
@@ -842,6 +850,9 @@
// Latched value of flag quic_remove_streams_waiting_for_acks.
const bool remove_streams_waiting_for_acks_;
+
+ // Latched value of flag quic_do_not_use_stream_map.
+ const bool do_not_use_stream_map_;
};
} // namespace quic