Remove zombie stream map in QuicSession.

Those streams will live in stream_map_ instead.

This CL also removes some duplicated tests.

Protected by gfe2_reloadable_flag_quic_remove_zombie_streams

PiperOrigin-RevId: 324877471
Change-Id: I6d3df84048c35e7ca52ec030e019bbc0def4d3ce
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index e025706..90304c0 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -420,6 +420,7 @@
   for (auto& stream : *closed_streams()) {
     static_cast<QuicSpdyStream*>(stream.get())->ClearSession();
   }
+  DCHECK(!remove_zombie_streams() || zombie_streams().empty());
   for (auto const& kv : zombie_streams()) {
     static_cast<QuicSpdyStream*>(kv.second.get())->ClearSession();
   }
@@ -1178,8 +1179,9 @@
 }
 
 bool QuicSpdySession::HasActiveRequestStreams() const {
-  DCHECK_GE(static_cast<size_t>(stream_map_size()), num_static_streams());
-  return stream_map_size() - num_static_streams() > 0;
+  DCHECK_GE(static_cast<size_t>(stream_map_size()),
+            num_static_streams() + num_zombie_streams());
+  return stream_map_size() - num_static_streams() - num_zombie_streams() > 0;
 }
 
 bool QuicSpdySession::ProcessPendingStream(PendingStream* pending) {
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 18b3951..ede4d2c 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -331,7 +331,6 @@
 
   using QuicSession::closed_streams;
   using QuicSession::ShouldKeepConnectionAlive;
-  using QuicSession::zombie_streams;
   using QuicSpdySession::ProcessPendingStream;
   using QuicSpdySession::UsesPendingStreams;
 
@@ -2096,21 +2095,6 @@
   EXPECT_EQ(3u, stream->flow_controller()->highest_received_byte_offset());
 }
 
-TEST_P(QuicSpdySessionTestServer, ZombieStreams) {
-  TestStream* stream2 = session_.CreateOutgoingBidirectionalStream();
-  QuicStreamPeer::SetStreamBytesWritten(3, stream2);
-  EXPECT_TRUE(stream2->IsWaitingForAcks());
-
-  CloseStream(stream2->id());
-  EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
-  ASSERT_EQ(1u, session_.closed_streams()->size());
-  EXPECT_EQ(stream2->id(), session_.closed_streams()->front()->id());
-  session_.OnStreamDoneWaitingForAcks(2);
-  EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
-  EXPECT_EQ(1u, session_.closed_streams()->size());
-  EXPECT_EQ(stream2->id(), session_.closed_streams()->front()->id());
-}
-
 TEST_P(QuicSpdySessionTestServer, OnStreamFrameLost) {
   InSequence s;
 
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index bc09fc0..b556367 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1730,18 +1730,6 @@
                                      QuicTime::Zero()));
 }
 
-TEST_P(QuicSpdyStreamTest, StreamBecomesZombieWithWriteThatCloses) {
-  Initialize(kShouldProcessData);
-  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(AtLeast(1));
-  QuicStreamPeer::CloseReadSide(stream_);
-  // This write causes stream to be closed.
-  stream_->WriteOrBufferBody("Test1", true);
-  // stream_ has unacked data and should become zombie.
-  EXPECT_TRUE(QuicContainsKey(QuicSessionPeer::zombie_streams(session_.get()),
-                              stream_->id()));
-  EXPECT_TRUE(QuicSessionPeer::closed_streams(session_.get()).empty());
-}
-
 TEST_P(QuicSpdyStreamTest, OnPriorityFrame) {
   Initialize(kShouldProcessData);
   stream_->OnPriorityFrame(spdy::SpdyStreamPrecedence(kV3HighestPriority));
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 6379861..ecfb2b2 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -76,6 +76,7 @@
       num_draining_streams_(0),
       num_outgoing_draining_streams_(0),
       num_static_streams_(0),
+      num_zombie_streams_(0),
       flow_controller_(
           this,
           QuicUtils::GetInvalidStreamId(connection->transport_version()),
@@ -102,8 +103,10 @@
       liveness_testing_in_progress_(false),
       remove_streams_waiting_for_acks_(
           GetQuicReloadableFlag(quic_remove_streams_waiting_for_acks)),
-      do_not_use_stream_map_(
-          GetQuicReloadableFlag(quic_do_not_use_stream_map)) {
+      do_not_use_stream_map_(GetQuicReloadableFlag(quic_do_not_use_stream_map)),
+      remove_zombie_streams_(
+          GetQuicReloadableFlag(quic_remove_zombie_streams) &&
+          do_not_use_stream_map_ && remove_streams_waiting_for_acks_) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -146,7 +149,13 @@
 }
 
 QuicSession::~QuicSession() {
-  QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
+  if (!remove_zombie_streams_) {
+    QUIC_LOG_IF(WARNING, !zombie_streams_.empty())
+        << "Still have zombie streams";
+  } else {
+    QUIC_LOG_IF(WARNING, num_zombie_streams_ > 0)
+        << "Still have zombie streams";
+  }
 }
 
 void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
@@ -412,9 +421,16 @@
       stream->OnConnectionClosed(frame.quic_error_code, source);
       QUIC_RELOADABLE_FLAG_COUNT(
           quic_do_not_close_stream_again_on_connection_close);
-      if (stream_map_.find(id) != stream_map_.end()) {
-        QUIC_BUG << ENDPOINT << "Stream " << id
-                 << " failed to close under OnConnectionClosed";
+      auto it = stream_map_.find(id);
+      if (it != stream_map_.end()) {
+        if (!remove_zombie_streams_) {
+          QUIC_BUG << ENDPOINT << "Stream " << id
+                   << " failed to close under OnConnectionClosed";
+        } else {
+          QUIC_BUG_IF(!it->second->IsZombie())
+              << ENDPOINT << "Non-zombie stream " << id
+              << " failed to close under OnConnectionClosed";
+        }
         if (!GetQuicReloadableFlag(
                 quic_do_not_close_stream_again_on_connection_close)) {
           CloseStream(id);
@@ -424,11 +440,16 @@
     });
   }
 
-  // Cleanup zombie stream map on connection close.
-  while (!zombie_streams_.empty()) {
-    ZombieStreamMap::iterator it = zombie_streams_.begin();
-    closed_streams_.push_back(std::move(it->second));
-    zombie_streams_.erase(it);
+  if (!remove_zombie_streams_) {
+    // Cleanup zombie stream map on connection close.
+    while (!zombie_streams_.empty()) {
+      ZombieStreamMap::iterator it = zombie_streams_.begin();
+      closed_streams_.push_back(std::move(it->second));
+      zombie_streams_.erase(it);
+    }
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 1, 4);
+    DCHECK(zombie_streams_.empty());
   }
 
   closed_streams_clean_up_alarm_->Cancel();
@@ -900,8 +921,15 @@
   QuicStream* stream = it->second.get();
   StreamType type = stream->type();
 
-  if (stream->IsWaitingForAcks()) {
-    zombie_streams_[stream_id] = std::move(it->second);
+  const bool stream_waiting_for_acks = stream->IsWaitingForAcks();
+  if (stream_waiting_for_acks) {
+    if (remove_zombie_streams_) {
+      // The stream needs to be kept alive because it's waiting for acks.
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 2, 4);
+      ++num_zombie_streams_;
+    } else {
+      zombie_streams_[stream_id] = std::move(it->second);
+    }
   } else {
     // Clean up the stream since it is no longer waiting for acks.
     if (remove_streams_waiting_for_acks_) {
@@ -910,6 +938,11 @@
       streams_waiting_for_acks_.erase(stream_id);
     }
     closed_streams_.push_back(std::move(it->second));
+    if (remove_zombie_streams_) {
+      // When zombie_streams_ is removed, stream is only erased from stream map
+      // if it's not zombie.
+      stream_map_.erase(it);
+    }
     // Do not retransmit data of a closed stream.
     streams_with_pending_retransmission_.erase(stream_id);
     if (!closed_streams_clean_up_alarm_->IsSet()) {
@@ -927,14 +960,18 @@
     DCHECK(!stream->was_draining());
     InsertLocallyClosedStreamsHighestOffset(
         stream_id, stream->flow_controller()->highest_received_byte_offset());
-    stream_map_.erase(it);
+    if (!remove_zombie_streams_) {
+      stream_map_.erase(it);
+    }
     return;
   }
 
   const bool stream_was_draining = stream->was_draining();
   QUIC_DVLOG_IF(1, stream_was_draining)
       << ENDPOINT << "Stream " << stream_id << " was draining";
-  stream_map_.erase(it);
+  if (!remove_zombie_streams_) {
+    stream_map_.erase(it);
+  }
   if (stream_was_draining) {
     QUIC_BUG_IF(num_draining_streams_ == 0);
     --num_draining_streams_;
@@ -1811,6 +1848,10 @@
 
   StreamMap::iterator it = stream_map_.find(stream_id);
   if (it != stream_map_.end()) {
+    if (remove_zombie_streams_ && it->second->IsZombie()) {
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 3, 4);
+      return nullptr;
+    }
     return it->second.get();
   }
 
@@ -1955,8 +1996,11 @@
 
 bool QuicSession::IsOpenStream(QuicStreamId id) {
   DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
-  if (QuicContainsKey(stream_map_, id) ||
-      QuicContainsKey(pending_stream_map_, id) ||
+  const StreamMap::iterator it = stream_map_.find(id);
+  if (it != stream_map_.end()) {
+    return remove_zombie_streams_ ? !it->second->IsZombie() : true;
+  }
+  if (QuicContainsKey(pending_stream_map_, id) ||
       QuicUtils::IsCryptoStreamId(transport_version(), id)) {
     // Stream is active
     return true;
@@ -1981,8 +2025,9 @@
            locally_closed_streams_highest_offset_.size();
   }
   DCHECK_GE(static_cast<QuicStreamCount>(stream_map_.size()),
-            num_static_streams_ + num_draining_streams_);
-  return stream_map_.size() - num_draining_streams_ - num_static_streams_;
+            num_static_streams_ + num_draining_streams_ + num_zombie_streams_);
+  return stream_map_.size() - num_draining_streams_ - num_static_streams_ -
+         num_zombie_streams_;
 }
 
 void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
@@ -2058,16 +2103,28 @@
     streams_waiting_for_acks_.erase(id);
   }
 
-  auto it = zombie_streams_.find(id);
-  if (it == zombie_streams_.end()) {
-    return;
+  if (!remove_zombie_streams_) {
+    auto it = zombie_streams_.find(id);
+    if (it == zombie_streams_.end()) {
+      return;
+    }
+
+    closed_streams_.push_back(std::move(it->second));
+    zombie_streams_.erase(it);
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_zombie_streams, 4, 4);
+    auto it = stream_map_.find(id);
+    if (it == stream_map_.end()) {
+      return;
+    }
+    --num_zombie_streams_;
+    closed_streams_.push_back(std::move(it->second));
+    stream_map_.erase(it);
   }
 
-  closed_streams_.push_back(std::move(it->second));
   if (!closed_streams_clean_up_alarm_->IsSet()) {
     closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
   }
-  zombie_streams_.erase(it);
   // Do not retransmit data of a closed stream.
   streams_with_pending_retransmission_.erase(id);
 }
@@ -2086,6 +2143,7 @@
 
   // The number of the streams waiting for acks should not be larger than the
   // number of streams.
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
       streams_waiting_for_acks_.size()) {
     QUIC_BUG << "More streams are waiting for acks than the number of streams. "
@@ -2101,6 +2159,8 @@
   if (active_stream != stream_map_.end()) {
     return active_stream->second.get();
   }
+
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   auto zombie_stream = zombie_streams_.find(id);
   if (zombie_stream != zombie_streams_.end()) {
     return zombie_stream->second.get();
@@ -2250,6 +2310,7 @@
     return !streams_waiting_for_acks_.empty();
   }
   QUIC_RELOADABLE_FLAG_COUNT_N(quic_remove_streams_waiting_for_acks, 4, 4);
+  DCHECK(!remove_zombie_streams_ || zombie_streams_.empty());
   if (!zombie_streams().empty()) {
     return true;
   }
@@ -2497,14 +2558,15 @@
 
 void QuicSession::PerformActionOnActiveStreams(
     std::function<bool(QuicStream*)> action) {
-  QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+  QuicSmallMap<QuicStreamId, QuicStream*, 10> active_streams;
   for (const auto& it : stream_map_) {
-    if (!it.second->is_static()) {
-      non_static_streams[it.first] = it.second.get();
+    if (!it.second->is_static() &&
+        (!remove_zombie_streams_ || !it.second->IsZombie())) {
+      active_streams[it.first] = it.second.get();
     }
   }
 
-  for (const auto& it : non_static_streams) {
+  for (const auto& it : active_streams) {
     if (!action(it.second)) {
       return;
     }
@@ -2514,7 +2576,9 @@
 void QuicSession::PerformActionOnActiveStreams(
     std::function<bool(QuicStream*)> action) const {
   for (const auto& it : stream_map_) {
-    if (!it.second->is_static() && !action(it.second.get())) {
+    if (!it.second->is_static() &&
+        (!remove_zombie_streams_ || !it.second->IsZombie()) &&
+        !action(it.second.get())) {
       return;
     }
   }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 57b95eb..0f53f09 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -347,6 +347,7 @@
   // Called when stream |id| is done waiting for acks either because all data
   // gets acked or is not interested in data being acked (which happens when
   // a stream is reset because of an error).
+  // TODO(b/136274541): rename to CloseZombieStreams.
   void OnStreamDoneWaitingForAcks(QuicStreamId id);
 
   // TODO(b/136274541): Remove this once quic_remove_streams_waiting_for_acks is
@@ -498,6 +499,8 @@
     return liveness_testing_in_progress_;
   }
 
+  bool remove_zombie_streams() const { return remove_zombie_streams_; }
+
  protected:
   using StreamMap = QuicHashMap<QuicStreamId, std::unique_ptr<QuicStream>>;
 
@@ -621,6 +624,8 @@
 
   size_t num_static_streams() const { return num_static_streams_; }
 
+  size_t num_zombie_streams() const { return num_zombie_streams_; }
+
   bool was_zero_rtt_rejected() const { return was_zero_rtt_rejected_; }
 
   bool do_not_use_stream_map() const { return do_not_use_stream_map_; }
@@ -799,6 +804,10 @@
   // A counter for static streams which are in stream_map_.
   size_t num_static_streams_;
 
+  // A counter for streams which have done reading and writing, but are waiting
+  // for acks.
+  size_t num_zombie_streams_;
+
   // Received information for a connection close.
   QuicConnectionCloseFrame on_closed_frame_;
 
@@ -860,6 +869,9 @@
 
   // Latched value of flag quic_do_not_use_stream_map.
   const bool do_not_use_stream_map_;
+
+  // Latched value of flag quic_remove_zombie_streams.
+  const bool remove_zombie_streams_;
 };
 
 }  // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index a033573..1a5a290 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -364,6 +364,7 @@
   using QuicSession::closed_streams;
   using QuicSession::GetNextOutgoingBidirectionalStreamId;
   using QuicSession::GetNextOutgoingUnidirectionalStreamId;
+  using QuicSession::stream_map;
   using QuicSession::zombie_streams;
 
  private:
@@ -2572,7 +2573,13 @@
   stream2->WriteOrBufferData(body, true, nullptr);
   EXPECT_TRUE(stream2->IsWaitingForAcks());
   // Verify stream2 is a zombie streams.
-  EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
+  if (!session_.remove_zombie_streams()) {
+    EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream2->id()));
+  } else {
+    ASSERT_TRUE(QuicContainsKey(session_.stream_map(), stream2->id()));
+    auto* stream = session_.stream_map().find(stream2->id())->second.get();
+    EXPECT_TRUE(stream->IsZombie());
+  }
 
   QuicStreamFrame frame(stream2->id(), true, 0, 100);
   EXPECT_CALL(*stream2, HasPendingRetransmission())
@@ -2620,7 +2627,13 @@
   stream4->WriteOrBufferData(body, false, nullptr);
   EXPECT_FALSE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
   stream4->WriteOrBufferData(body, true, nullptr);
-  EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
+  if (!session_.remove_zombie_streams()) {
+    EXPECT_TRUE(QuicContainsKey(session_.zombie_streams(), stream4->id()));
+  } else {
+    ASSERT_TRUE(QuicContainsKey(session_.stream_map(), stream4->id()));
+    auto* stream = session_.stream_map().find(stream4->id())->second.get();
+    EXPECT_TRUE(stream->IsZombie());
+  }
 }
 
 TEST_P(QuicSessionTestServer, ReceivedDataOnWriteUnidirectionalStream) {
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 9e695d4..ee3f983 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -979,7 +979,8 @@
     fin_outstanding_ = false;
     fin_lost_ = false;
   }
-  if (!IsWaitingForAcks()) {
+  if (!IsWaitingForAcks() && (!session()->remove_zombie_streams() ||
+                              (read_side_closed_ && write_side_closed_))) {
     session_->OnStreamDoneWaitingForAcks(id_);
   }
   return new_data_acked;
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 2a7d036..12b1192 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -203,6 +203,10 @@
   }
   bool write_side_closed() const { return write_side_closed_; }
 
+  bool IsZombie() const {
+    return read_side_closed_ && write_side_closed_ && IsWaitingForAcks();
+  }
+
   bool rst_received() const { return rst_received_; }
   bool rst_sent() const { return rst_sent_; }
   bool fin_received() const { return fin_received_; }
diff --git a/quic/qbone/qbone_client_session.cc b/quic/qbone/qbone_client_session.cc
index 2e5d205..f375e0a 100644
--- a/quic/qbone/qbone_client_session.cc
+++ b/quic/qbone/qbone_client_session.cc
@@ -94,7 +94,7 @@
     const ProofVerifyDetails& verify_details) {}
 
 bool QboneClientSession::HasActiveRequests() const {
-  return (stream_map_size() - num_static_streams()) > 0;
+  return (stream_map_size() - num_static_streams() - num_zombie_streams()) > 0;
 }
 
 }  // namespace quic
diff --git a/quic/quic_transport/quic_transport_client_session_test.cc b/quic/quic_transport/quic_transport_client_session_test.cc
index ba65ce2..9c2f138 100644
--- a/quic/quic_transport/quic_transport_client_session_test.cc
+++ b/quic/quic_transport/quic_transport_client_session_test.cc
@@ -108,9 +108,16 @@
   Connect();
   EXPECT_TRUE(session_->IsSessionReady());
 
-  QuicStream* client_indication_stream =
-      QuicSessionPeer::zombie_streams(session_.get())[ClientIndicationStream()]
-          .get();
+  QuicStream* client_indication_stream;
+  if (session_->remove_zombie_streams()) {
+    client_indication_stream =
+        QuicSessionPeer::stream_map(session_.get())[ClientIndicationStream()]
+            .get();
+  } else {
+    client_indication_stream = QuicSessionPeer::zombie_streams(
+                                   session_.get())[ClientIndicationStream()]
+                                   .get();
+  }
   ASSERT_TRUE(client_indication_stream != nullptr);
   const std::string client_indication = DataInStream(client_indication_stream);
   const std::string expected_client_indication{
@@ -133,9 +140,16 @@
   Connect();
   EXPECT_TRUE(session_->IsSessionReady());
 
-  QuicStream* client_indication_stream =
-      QuicSessionPeer::zombie_streams(session_.get())[ClientIndicationStream()]
-          .get();
+  QuicStream* client_indication_stream;
+  if (session_->remove_zombie_streams()) {
+    client_indication_stream =
+        QuicSessionPeer::stream_map(session_.get())[ClientIndicationStream()]
+            .get();
+  } else {
+    client_indication_stream = QuicSessionPeer::zombie_streams(
+                                   session_.get())[ClientIndicationStream()]
+                                   .get();
+  }
   ASSERT_TRUE(client_indication_stream != nullptr);
   const std::string client_indication = DataInStream(client_indication_stream);
   const std::string expected_client_indication{