gfe-relnote: Remove draining_streams_ from QuicSession. Protected by gfe2_reloadable_flag_quic_deprecate_draining_streams.

PiperOrigin-RevId: 306919260
Change-Id: I6d40df89133f450ff3e913280465aff328714408
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 63fb18d..e09c9bc 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -1774,7 +1774,7 @@
     QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT"));
     session_.OnStreamFrame(data1);
     EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
-    session_.StreamDraining(i);
+    session_.StreamDraining(i, /*unidirectional=*/false);
     EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
   }
 }
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index b57ddd7..4124575 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -93,6 +93,7 @@
                                 num_expected_unidirectional_static_streams),
       num_dynamic_incoming_streams_(0),
       num_draining_incoming_streams_(0),
+      num_draining_outgoing_streams_(0),
       num_outgoing_static_streams_(0),
       num_incoming_static_streams_(0),
       num_locally_closed_incoming_streams_highest_offset_(0),
@@ -121,7 +122,9 @@
           num_expected_unidirectional_static_streams),
       enable_round_robin_scheduling_(false),
       write_with_transmission_(
-          GetQuicReloadableFlag(quic_write_with_transmission)) {
+          GetQuicReloadableFlag(quic_write_with_transmission)),
+      deprecate_draining_streams_(
+          GetQuicReloadableFlag(quic_deprecate_draining_streams)) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -685,7 +688,7 @@
 }
 
 uint64_t QuicSession::GetNumOpenDynamicStreams() const {
-  return stream_map_.size() - draining_streams_.size() +
+  return stream_map_.size() - GetNumDrainingStreams() +
          locally_closed_streams_highest_offset_.size() -
          num_incoming_static_streams_ - num_outgoing_static_streams_;
 }
@@ -904,16 +907,27 @@
     InsertLocallyClosedStreamsHighestOffset(
         stream_id, stream->flow_controller()->highest_received_byte_offset());
   }
+  bool stream_was_draining = false;
+  if (deprecate_draining_streams_) {
+    stream_was_draining = stream->was_draining();
+    QUIC_DVLOG_IF(1, stream_was_draining)
+        << ENDPOINT << "Stream " << stream_id << " was draining";
+  }
   stream_map_.erase(it);
   if (IsIncomingStream(stream_id)) {
     --num_dynamic_incoming_streams_;
   }
-
-  const bool stream_was_draining =
-      draining_streams_.find(stream_id) != draining_streams_.end();
+  if (!deprecate_draining_streams_) {
+    stream_was_draining =
+        draining_streams_.find(stream_id) != draining_streams_.end();
+  }
   if (stream_was_draining) {
     if (IsIncomingStream(stream_id)) {
+      QUIC_BUG_IF(num_draining_incoming_streams_ == 0);
       --num_draining_incoming_streams_;
+    } else if (deprecate_draining_streams_) {
+      QUIC_BUG_IF(num_draining_outgoing_streams_ == 0);
+      --num_draining_outgoing_streams_;
     }
     draining_streams_.erase(stream_id);
   } else if (VersionHasIetfQuicFrames(transport_version())) {
@@ -1593,8 +1607,22 @@
   return CreateIncomingStream(stream_id);
 }
 
-void QuicSession::StreamDraining(QuicStreamId stream_id) {
+void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
   DCHECK(QuicContainsKey(stream_map_, stream_id));
+  if (deprecate_draining_streams_) {
+    QUIC_RELOADABLE_FLAG_COUNT(quic_deprecate_draining_streams);
+    QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
+    if (VersionHasIetfQuicFrames(transport_version())) {
+      v99_streamid_manager_.OnStreamClosed(stream_id);
+    }
+    if (IsIncomingStream(stream_id)) {
+      ++num_draining_incoming_streams_;
+      return;
+    }
+    ++num_draining_outgoing_streams_;
+    OnCanCreateNewOutgoingStream(unidirectional);
+    return;
+  }
   if (!QuicContainsKey(draining_streams_, stream_id)) {
     draining_streams_.insert(stream_id);
     if (IsIncomingStream(stream_id)) {
@@ -1749,11 +1777,14 @@
 }
 
 size_t QuicSession::GetNumActiveStreams() const {
-  return stream_map_.size() - draining_streams_.size() -
+  return stream_map_.size() - GetNumDrainingStreams() -
          num_incoming_static_streams_ - num_outgoing_static_streams_;
 }
 
 size_t QuicSession::GetNumDrainingStreams() const {
+  if (deprecate_draining_streams_) {
+    return num_draining_incoming_streams_ + num_draining_outgoing_streams_;
+  }
   return draining_streams_.size();
 }
 
@@ -1796,6 +1827,9 @@
 }
 
 size_t QuicSession::GetNumDrainingOutgoingStreams() const {
+  if (deprecate_draining_streams_) {
+    return num_draining_outgoing_streams_;
+  }
   DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
   return draining_streams_.size() - num_draining_incoming_streams_;
 }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 4c4626b..d1720a0 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -400,7 +400,7 @@
   QuicStream* GetOrCreateStream(const QuicStreamId stream_id);
 
   // Mark a stream as draining.
-  virtual void StreamDraining(QuicStreamId id);
+  void StreamDraining(QuicStreamId id, bool unidirectional);
 
   // Returns true if this stream should yield writes to another blocked stream.
   virtual bool ShouldYield(QuicStreamId stream_id);
@@ -483,6 +483,10 @@
 
   bool write_with_transmission() const { return write_with_transmission_; }
 
+  bool deprecate_draining_streams() const {
+    return deprecate_draining_streams_;
+  }
+
  protected:
   using StreamMap = QuicSmallMap<QuicStreamId, std::unique_ptr<QuicStream>, 10>;
 
@@ -735,6 +739,8 @@
   // Set of stream ids that are "draining" -- a FIN has been sent and received,
   // but the stream object still exists because not all the received data has
   // been consumed.
+  // TODO(fayang): Remove draining_streams_ when deprecate
+  // quic_deprecate_draining_streams.
   QuicUnorderedSet<QuicStreamId> draining_streams_;
 
   // Set of stream ids that are waiting for acks excluding crypto stream id.
@@ -751,9 +757,15 @@
   // A counter for peer initiated dynamic streams which are in the stream_map_.
   size_t num_dynamic_incoming_streams_;
 
-  // A counter for peer initiated streams which are in the draining_streams_.
+  // A counter for peer initiated streams which have sent and received FIN but
+  // waiting for application to consume data.
   size_t num_draining_incoming_streams_;
 
+  // A counter for self initiated streams which have sent and received FIN but
+  // waiting for application to consume data. Only used when
+  // deprecate_draining_streams_ is true.
+  size_t num_draining_outgoing_streams_;
+
   // A counter for self initiated static streams which are in
   // stream_map_.
   size_t num_outgoing_static_streams_;
@@ -818,6 +830,9 @@
 
   // Latched value of gfe2_reloadable_flag_quic_write_with_transmission.
   const bool write_with_transmission_;
+
+  // Latched value of quic_deprecate_draining_streams.
+  const bool deprecate_draining_streams_;
 };
 
 }  // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index d50931c..bf4bffc 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -1888,7 +1888,7 @@
   QuicStreamFrame data1(stream_id, true, 0, quiche::QuicheStringPiece("HT"));
   session_.OnStreamFrame(data1);
   EXPECT_CALL(session_, OnCanCreateNewOutgoingStream(false)).Times(1);
-  session_.StreamDraining(stream_id);
+  session_.StreamDraining(stream_id, /*unidirectional=*/false);
 }
 
 TEST_P(QuicSessionTestServer, NoPendingStreams) {
@@ -2019,7 +2019,7 @@
     QuicStreamFrame data1(i, true, 0, quiche::QuicheStringPiece("HT"));
     session_.OnStreamFrame(data1);
     EXPECT_EQ(1u, session_.GetNumOpenIncomingStreams());
-    session_.StreamDraining(i);
+    session_.StreamDraining(i, /*unidirectional=*/false);
     EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
   }
 }
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 3b2484d..813ea2d 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -353,6 +353,7 @@
       buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
       is_static_(is_static),
       deadline_(QuicTime::Zero()),
+      was_draining_(false),
       type_(VersionHasIetfQuicFrames(session->transport_version()) &&
                     type != CRYPTO
                 ? QuicUtils::GetStreamType(id_,
@@ -431,9 +432,14 @@
   }
 
   if (frame.fin) {
-    fin_received_ = true;
-    if (fin_sent_) {
-      session_->StreamDraining(id_);
+    if (!session_->deprecate_draining_streams() || !fin_received_) {
+      fin_received_ = true;
+      if (fin_sent_) {
+        DCHECK(!was_draining_ || !session_->deprecate_draining_streams());
+        session_->StreamDraining(id_,
+                                 /*unidirectional=*/type_ != BIDIRECTIONAL);
+        was_draining_ = true;
+      }
     }
   }
 
@@ -1087,10 +1093,14 @@
       MaybeSendBlocked();
     }
     if (fin && consumed_data.fin_consumed) {
+      DCHECK(!fin_sent_);
       fin_sent_ = true;
       fin_outstanding_ = true;
       if (fin_received_) {
-        session_->StreamDraining(id_);
+        DCHECK(!was_draining_);
+        session_->StreamDraining(id_,
+                                 /*unidirectional=*/type_ != BIDIRECTIONAL);
+        was_draining_ = true;
       }
       CloseWriteSide();
     } else if (fin && !consumed_data.fin_consumed) {
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index bf12149..b8e940f 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -358,6 +358,8 @@
   // Returns true if the stream is static.
   bool is_static() const { return is_static_; }
 
+  bool was_draining() const { return was_draining_; }
+
   static spdy::SpdyStreamPrecedence CalculateDefaultPriority(
       const QuicSession* session);
 
@@ -534,6 +536,10 @@
   // If initialized, reset this stream at this deadline.
   QuicTime deadline_;
 
+  // True if this stream has entered draining state. Only used when
+  // quic_deprecate_draining_streams is true.
+  bool was_draining_;
+
   // Indicates whether this stream is bidirectional, read unidirectional or
   // write unidirectional.
   const StreamType type_;
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc
index eea7b15..100c05e 100644
--- a/quic/core/quic_stream_test.cc
+++ b/quic/core/quic_stream_test.cc
@@ -746,8 +746,7 @@
                              nullptr);
   EXPECT_TRUE(stream_->write_side_closed());
 
-  EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get())
-                    ->count(kTestStreamId));
+  EXPECT_EQ(1u, session_->GetNumDrainingStreams());
   EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams());
 }
 
@@ -775,8 +774,7 @@
   EXPECT_FALSE(QuicStreamPeer::read_side_closed(stream_));
   EXPECT_FALSE(stream_->reading_stopped());
 
-  EXPECT_EQ(1u, QuicSessionPeer::GetDrainingStreams(session_.get())
-                    ->count(kTestStreamId));
+  EXPECT_EQ(1u, session_->GetNumDrainingStreams());
   EXPECT_EQ(0u, session_->GetNumOpenIncomingStreams());
 }
 
diff --git a/quic/test_tools/quic_session_peer.cc b/quic/test_tools/quic_session_peer.cc
index cbaf5d6..c23bc88 100644
--- a/quic/test_tools/quic_session_peer.cc
+++ b/quic/test_tools/quic_session_peer.cc
@@ -143,12 +143,6 @@
 }
 
 // static
-QuicUnorderedSet<QuicStreamId>* QuicSessionPeer::GetDrainingStreams(
-    QuicSession* session) {
-  return &session->draining_streams_;
-}
-
-// static
 void QuicSessionPeer::ActivateStream(QuicSession* session,
                                      std::unique_ptr<QuicStream> stream) {
   return session->ActivateStream(std::move(stream));
diff --git a/quic/test_tools/quic_session_peer.h b/quic/test_tools/quic_session_peer.h
index 72caa91..5a58360 100644
--- a/quic/test_tools/quic_session_peer.h
+++ b/quic/test_tools/quic_session_peer.h
@@ -59,8 +59,6 @@
   static QuicSession::StreamMap& stream_map(QuicSession* session);
   static const QuicSession::ClosedStreams& closed_streams(QuicSession* session);
   static QuicSession::ZombieStreamMap& zombie_streams(QuicSession* session);
-  static QuicUnorderedSet<QuicStreamId>* GetDrainingStreams(
-      QuicSession* session);
   static void ActivateStream(QuicSession* session,
                              std::unique_ptr<QuicStream> stream);
 
diff --git a/quic/test_tools/quic_test_utils.h b/quic/test_tools/quic_test_utils.h
index 8147dcd..fafe8d0 100644
--- a/quic/test_tools/quic_test_utils.h
+++ b/quic/test_tools/quic_test_utils.h
@@ -666,6 +666,7 @@
   MOCK_METHOD1(OnAlpnSelected, void(quiche::QuicheStringPiece));
 
   using QuicSession::ActivateStream;
+  using QuicSession::GetNumDrainingStreams;
 
   // Returns a QuicConsumedData that indicates all of |write_length| (and |fin|
   // if set) has been consumed.
diff --git a/quic/tools/quic_simple_server_session_test.cc b/quic/tools/quic_simple_server_session_test.cc
index 1d5f3df..bdcedbc 100644
--- a/quic/tools/quic_simple_server_session_test.cc
+++ b/quic/tools/quic_simple_server_session_test.cc
@@ -825,9 +825,11 @@
   }
 
   if (VersionUsesHttp3(transport_version())) {
-    session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3));
+    session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3),
+                             /*unidirectional=*/true);
   } else {
-    session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0));
+    session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(0),
+                             /*unidirectional=*/true);
   }
   // Number of open outgoing streams should still be the same, because a new
   // stream is opened. And the queue should be empty.
@@ -924,8 +926,10 @@
     session_->OnMaxStreamsFrame(
         QuicMaxStreamsFrame(0, num_resources + 3, /*unidirectional=*/true));
   }
-  session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3));
-  session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4));
+  session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(3),
+                           /*unidirectional=*/true);
+  session_->StreamDraining(GetNthServerInitiatedUnidirectionalId(4),
+                           /*unidirectional=*/true);
 }
 
 // Tests that closing a open outgoing stream can trigger a promised resource in