Introduce QuicSession::PerformActionsOnActiveStreams().

This prevents applications from knowing details of all the streams and only perform actions on active ones.

Protected by gfe2_reloadable_flag_quic_do_not_use_stream_map

PiperOrigin-RevId: 322884736
Change-Id: Idc0478732e2def72627071987ec881e173e8097c
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index d13166e..99bbb0e 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -101,7 +101,9 @@
       was_zero_rtt_rejected_(false),
       fix_gquic_stream_type_(GetQuicReloadableFlag(quic_fix_gquic_stream_type)),
       remove_streams_waiting_for_acks_(
-          GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)) {
+          GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
+      do_not_use_stream_map_(
+          GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -376,26 +378,46 @@
 
   GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);
 
-  // Copy all non static streams in a new map for the ease of deleting.
-  QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
-  for (const auto& it : stream_map_) {
-    if (!it.second->is_static()) {
-      non_static_streams[it.first] = it.second.get();
-    }
-  }
-  for (const auto& it : non_static_streams) {
-    QuicStreamId id = it.first;
-    it.second->OnConnectionClosed(frame.quic_error_code, source);
-    QUIC_RELOADABLE_FLAG_COUNT(
-        quic_do_not_close_stream_again_on_connection_close);
-    if (stream_map_.find(id) != stream_map_.end()) {
-      QUIC_BUG << ENDPOINT << "Stream " << id
-               << " failed to close under OnConnectionClosed";
-      if (!GetQuicReloadableFlag(
-              quic_do_not_close_stream_again_on_connection_close)) {
-        CloseStream(id);
+  if (!do_not_use_stream_map_) {
+    // Copy all non static streams in a new map for the ease of deleting.
+    QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+    for (const auto& it : stream_map_) {
+      if (!it.second->is_static()) {
+        non_static_streams[it.first] = it.second.get();
       }
     }
+
+    for (const auto& it : non_static_streams) {
+      QuicStreamId id = it.first;
+      it.second->OnConnectionClosed(frame.quic_error_code, source);
+      QUIC_RELOADABLE_FLAG_COUNT(
+          quic_do_not_close_stream_again_on_connection_close);
+      if (stream_map_.find(id) != stream_map_.end()) {
+        QUIC_BUG << ENDPOINT << "Stream " << id
+                 << " failed to close under OnConnectionClosed";
+        if (!GetQuicReloadableFlag(
+                quic_do_not_close_stream_again_on_connection_close)) {
+          CloseStream(id);
+        }
+      }
+    }
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_do_not_use_stream_map, 1, 2);
+    PerformActionOnActiveStreams([this, frame, source](QuicStream* stream) {
+      QuicStreamId id = stream->id();
+      stream->OnConnectionClosed(frame.quic_error_code, source);
+      QUIC_RELOADABLE_FLAG_COUNT(
+          quic_do_not_close_stream_again_on_connection_close);
+      if (stream_map_.find(id) != stream_map_.end()) {
+        QUIC_BUG << ENDPOINT << "Stream " << id
+                 << " failed to close under OnConnectionClosed";
+        if (!GetQuicReloadableFlag(
+                quic_do_not_close_stream_again_on_connection_close)) {
+          CloseStream(id);
+        }
+      }
+      return true;
+    });
   }
 
   // Cleanup zombie stream map on connection close.
@@ -2451,5 +2473,30 @@
   GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
 }
 
+void QuicSession::PerformActionOnActiveStreams(
+    std::function<bool(QuicStream*)> action) {
+  QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+  for (const auto& it : stream_map_) {
+    if (!it.second->is_static()) {
+      non_static_streams[it.first] = it.second.get();
+    }
+  }
+
+  for (const auto& it : non_static_streams) {
+    if (!action(it.second)) {
+      return;
+    }
+  }
+}
+
+void QuicSession::PerformActionOnActiveStreams(
+    std::function<bool(QuicStream*)> action) const {
+  for (const auto& it : stream_map_) {
+    if (!it.second->is_static() && !action(it.second.get())) {
+      return;
+    }
+  }
+}
+
 #undef ENDPOINT  // undef for jumbo builds
 }  // namespace quic
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 635ffa9..09ce2fd 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -617,6 +617,8 @@
 
   bool was_zero_rtt_rejected() const { return was_zero_rtt_rejected_; }
 
+  bool do_not_use_stream_map() const { return do_not_use_stream_map_; }
+
   size_t num_outgoing_draining_streams() const {
     return num_outgoing_draining_streams_;
   }
@@ -628,6 +630,12 @@
     return false;
   }
 
+  // Called by applications to perform |action| on active streams.
+  // Stream iteration will be stopped if action returns false.
+  void PerformActionOnActiveStreams(std::function<bool(QuicStream*)> action);
+  void PerformActionOnActiveStreams(
+      std::function<bool(QuicStream*)> action) const;
+
   // Return the largest peer created stream id depending on directionality
   // indicated by |unidirectional|.
   QuicStreamId GetLargestPeerCreatedStreamId(bool unidirectional) const;
@@ -842,6 +850,9 @@
 
   // Latched value of flag quic_remove_streams_waiting_for_acks.
   const bool remove_streams_waiting_for_acks_;
+
+  // Latched value of flag quic_do_not_use_stream_map.
+  const bool do_not_use_stream_map_;
 };
 
 }  // namespace quic