Remove zombie stream map in QuicSession.

Those streams will live in stream_map_ instead.

This CL also removes some duplicated tests.

Protected by gfe2_reloadable_flag_quic_remove_zombie_streams

PiperOrigin-RevId: 324877471
Change-Id: I6d3df84048c35e7ca52ec030e019bbc0def4d3ce
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 6379861..ecfb2b2 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -76,6 +76,7 @@
       num_draining_streams_(0),
       num_outgoing_draining_streams_(0),
       num_static_streams_(0),
+      num_zombie_streams_(0),
       flow_controller_(
           this,
           QuicUtils::GetInvalidStreamId(connection->transport_version()),
@@ -102,8 +103,10 @@
       liveness_testing_in_progress_(false),
       remove_streams_waiting_for_acks_(
           GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
-      do_not_use_stream_map_(
-          GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
+      do_not_use_stream_map_(GetQuicReloadableFlag(quic_do_not_use_stream_map)),
+      remove_zombie_streams_(
+          GetQuicReloadableFlag(quic_remove_zombie_streams) &&
+          do_not_use_stream_map_ && remove_streams_waiting_for_acks_) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -146,7 +149,13 @@
 }
 
 QuicSession::~QuicSession() {
-  QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
+  if (!remove_zombie_streams_) {
+    QUIC_LOG_IF(WARNING, !zombie_streams_.empty())
+        << "Still have zombie streams";
+  } else {
+    QUIC_LOG_IF(WARNING, num_zombie_streams_ > 0)
+        << "Still have zombie streams";
+  }
 }
 
 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
@@ -412,9 +421,16 @@
       stream->OnConnectionClosed(frame.quic_error_code, source);
       QUIC_RELOADABLE_FLAG_COUNT(
           quic_do_not_close_stream_again_on_connection_close);
-      if (stream_map_.find(id) != stream_map_.end()) {
-        QUIC_BUG << ENDPOINT << "Stream " << id
-                 << " failed to close under OnConnectionClosed";
+      auto it = stream_map_.find(id);
+      if (it != stream_map_.end()) {
+        if (!remove_zombie_streams_) {
+          QUIC_BUG << ENDPOINT << "Stream " << id
+                   << " failed to close under OnConnectionClosed";
+        } else {
+          QUIC_BUG_IF(!it->second->IsZombie())
+              << ENDPOINT << "Non-zombie stream " << id
+              << " failed to close under OnConnectionClosed";
+        }
         if (!GetQuicReloadableFlag(
                 quic_do_not_close_stream_again_on_connection_close)) {
           CloseStream(id);
@@ -424,11 +440,16 @@
     });
   }
 
-  // Cleanup zombie stream map on connection close.
-  while (!zombie_streams_.empty()) {
-    ZombieStreamMap::iterator it = zombie_streams_.begin();
-    closed_streams_.push_back(std::move(it->second));
-    zombie_streams_.erase(it);
+  if (!remove_zombie_streams_) {
+    // Cleanup zombie stream map on connection close.
+    while (!zombie_streams_.empty()) {
+      ZombieStreamMap::iterator it = zombie_streams_.begin();
+      closed_streams_.push_back(std::move(it->second));
+      zombie_streams_.erase(it);
+    }
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 1, 4);
+    DCHECK(zombie_streams_.empty());
   }
 
   closed_streams_clean_up_alarm_->Cancel();
@@ -900,8 +921,15 @@
   QuicStream* stream = it->second.get();
   StreamType type = stream->type();
 
-  if (stream->IsWaitingForAcks()) {
-    zombie_streams_[stream_id] = std::move(it->second);
+  const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
+  if (stream_waiting_for_acks) {
+    if (remove_zombie_streams_) {
+      // The stream needs to be kept alive because it's waiting for acks.
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 2, 4);
+      ++num_zombie_streams_;
+    } else {
+      zombie_streams_[stream_id] = std::move(it->second);
+    }
   } else {
     // Clean up the stream since it is no longer waiting for acks.
     if (remove_streams_waiting_for_acks_) {
@@ -910,6 +938,11 @@
       streams_waiting_for_acks_.erase(stream_id);
     }
     closed_streams_.push_back(std::move(it->second));
+    if (remove_zombie_streams_) {
+      // When zombie_streams_ is removed, stream is only erased from stream map
+      // if it's not zombie.
+      stream_map_.erase(it);
+    }
     // Do not retransmit data of a closed stream.
     streams_with_pending_retransmission_.erase(stream_id);
     if (!closed_streams_clean_up_alarm_->IsSet()) {
@@ -927,14 +960,18 @@
     DCHECK(!stream->was_draining());
     InsertLocallyClosedStreamsHighestOffset(
         stream_id, stream->flow_controller()->highest_received_byte_offset());
-    stream_map_.erase(it);
+    if (!remove_zombie_streams_) {
+      stream_map_.erase(it);
+    }
     return;
   }
 
   const bool stream_was_draining = stream->was_draining();
   QUIC_DVLOG_IF(1, stream_was_draining)
       << ENDPOINT << "Stream " << stream_id << " was draining";
-  stream_map_.erase(it);
+  if (!remove_zombie_streams_) {
+    stream_map_.erase(it);
+  }
   if (stream_was_draining) {
     QUIC_BUG_IF(num_draining_streams_ == 0);
     --num_draining_streams_;
@@ -1811,6 +1848,10 @@
 
   StreamMap::iterator it = stream_map_.find(stream_id);
   if (it != stream_map_.end()) {
+    if (remove_zombie_streams_ && it->second->IsZombie()) {
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 3, 4);
+      return nullptr;
+    }
     return it->second.get();
   }
 
@@ -1955,8 +1996,11 @@
 
 bool QuicSession::IsOpenStream(QuicStreamId id) {
   DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
-  if (QuicContainsKey(stream_map_, id) ||
-      QuicContainsKey(pending_stream_map_, id) ||
+  const StreamMap::iterator it = stream_map_.find(id);
+  if (it != stream_map_.end()) {
+    return remove_zombie_streams_ ? !it->second->IsZombie() : true;
+  }
+  if (QuicContainsKey(pending_stream_map_, id) ||
       QuicUtils::IsCryptoStreamId(transport_version(), id)) {
     // Stream is active
     return true;
@@ -1981,8 +2025,9 @@
            locally_closed_streams_highest_offset_.size();
   }
   DCHECK_GE(static_cast<QuicStreamCount>(stream_map_.size()),
-            num_static_streams_ + num_draining_streams_);
-  return stream_map_.size() - num_draining_streams_ - num_static_streams_;
+            num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
+  return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
+         num_zombie_streams_;
 }
 
 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
@@ -2058,16 +2103,28 @@
     streams_waiting_for_acks_.erase(id);
   }
 
-  auto it = zombie_streams_.find(id);
-  if (it == zombie_streams_.end()) {
-    return;
+  if (!remove_zombie_streams_) {
+    auto it = zombie_streams_.find(id);
+    if (it == zombie_streams_.end()) {
+      return;
+    }
+
+    closed_streams_.push_back(std::move(it->second));
+    zombie_streams_.erase(it);
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 4, 4);
+    auto it = stream_map_.find(id);
+    if (it == stream_map_.end()) {
+      return;
+    }
+    --num_zombie_streams_;
+    closed_streams_.push_back(std::move(it->second));
+    stream_map_.erase(it);
   }
 
-  closed_streams_.push_back(std::move(it->second));
   if (!closed_streams_clean_up_alarm_->IsSet()) {
     closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
   }
-  zombie_streams_.erase(it);
   // Do not retransmit data of a closed stream.
   streams_with_pending_retransmission_.erase(id);
 }
@@ -2086,6 +2143,7 @@
 
   // The number of the streams waiting for acks should not be larger than the
   // number of streams.
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
       streams_waiting_for_acks_.size()) {
     QUIC_BUG << "More streams are waiting for acks than the number of streams. "
@@ -2101,6 +2159,8 @@
   if (active_stream != stream_map_.end()) {
     return active_stream->second.get();
   }
+
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   auto zombie_stream = zombie_streams_.find(id);
   if (zombie_stream != zombie_streams_.end()) {
     return zombie_stream->second.get();
@@ -2250,6 +2310,7 @@
     return !streams_waiting_for_acks_.empty();
   }
   QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_streams_waiting_for_acks, 4, 4);
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   if (!zombie_streams().empty()) {
     return true;
   }
@@ -2497,14 +2558,15 @@
 
 void QuicSession::PerformActionOnActiveStreams(
     std::function<bool(QuicStream*)> action) {
-  QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+  QuicSmallMap<QuicStreamId, QuicStream*, 10> active_streams;
   for (const auto& it : stream_map_) {
-    if (!it.second->is_static()) {
-      non_static_streams[it.first] = it.second.get();
+    if (!it.second->is_static() &&
+        (!remove_zombie_streams_ || !it.second->IsZombie())) {
+      active_streams[it.first] = it.second.get();
     }
   }
 
-  for (const auto& it : non_static_streams) {
+  for (const auto& it : active_streams) {
     if (!action(it.second)) {
       return;
     }
@@ -2514,7 +2576,9 @@
 void QuicSession::PerformActionOnActiveStreams(
     std::function<bool(QuicStream*)> action) const {
   for (const auto& it : stream_map_) {
-    if (!it.second->is_static() && !action(it.second.get())) {
+    if (!it.second->is_static() &&
+        (!remove_zombie_streams_ || !it.second->IsZombie()) &&
+        !action(it.second.get())) {
       return;
     }
   }