gfe-relnote: Move headers streams out of static stream map. Protected by
gfe_reloadable_quic_eliminate_static_stream_map.

Following up this CL, crypto stream will be moved out of static_stream_map too.

PiperOrigin-RevId: 244429772
Change-Id: I5186bc7ab4e6ee9c9f546a3c9456be489a4ddc26
diff --git a/quic/core/http/quic_spdy_client_session_base.cc b/quic/core/http/quic_spdy_client_session_base.cc
index d388d2e..4aa3348 100644
--- a/quic/core/http/quic_spdy_client_session_base.cc
+++ b/quic/core/http/quic_spdy_client_session_base.cc
@@ -95,6 +95,13 @@
     // It's quite possible to receive headers after a stream has been reset.
     return;
   }
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+      stream->is_static()) {
+    connection()->CloseConnection(
+        QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
   stream->OnPromiseHeaderList(promised_stream_id, frame_len, header_list);
 }
 
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index bda2e17..d7a5473 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -355,9 +355,15 @@
   headers_stream_ = QuicMakeUnique<QuicHeadersStream>((this));
   DCHECK_EQ(QuicUtils::GetHeadersStreamId(connection()->transport_version()),
             headers_stream_->id());
-  RegisterStaticStream(
-      QuicUtils::GetHeadersStreamId(connection()->transport_version()),
-      headers_stream_.get());
+  if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+    RegisterStaticStream(
+        QuicUtils::GetHeadersStreamId(connection()->transport_version()),
+        headers_stream_.get());
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 7, 9);
+    unowned_headers_stream_ = headers_stream_.get();
+    RegisterStaticStreamNew(std::move(headers_stream_));
+  }
 
   set_max_uncompressed_header_bytes(max_inbound_header_list_size_);
 
@@ -438,6 +444,14 @@
     // It's quite possible to receive headers after a stream has been reset.
     return;
   }
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+      stream->is_static()) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 8, 9);
+    connection()->CloseConnection(
+        QUIC_INVALID_HEADERS_STREAM_DATA, "stream is static",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
   stream->OnStreamHeaderList(fin, frame_len, header_list);
 }
 
@@ -478,7 +492,7 @@
   }
   SpdyPriorityIR priority_frame(id, parent_stream_id, weight, exclusive);
   SpdySerializedFrame frame(spdy_framer_.SerializeFrame(priority_frame));
-  headers_stream_->WriteOrBufferData(
+  headers_stream()->WriteOrBufferData(
       QuicStringPiece(frame.data(), frame.size()), false, nullptr);
   return frame.size();
 }
@@ -498,7 +512,7 @@
   push_promise.set_fin(false);
 
   SpdySerializedFrame frame(spdy_framer_.SerializeFrame(push_promise));
-  headers_stream_->WriteOrBufferData(
+  headers_stream()->WriteOrBufferData(
       QuicStringPiece(frame.data(), frame.size()), false, nullptr);
   return frame.size();
 }
@@ -508,7 +522,7 @@
   settings_frame.AddSetting(SETTINGS_MAX_HEADER_LIST_SIZE, value);
 
   SpdySerializedFrame frame(spdy_framer_.SerializeFrame(settings_frame));
-  headers_stream_->WriteOrBufferData(
+  headers_stream()->WriteOrBufferData(
       QuicStringPiece(frame.data(), frame.size()), false, nullptr);
   return frame.size();
 }
@@ -567,7 +581,7 @@
     headers_frame.set_exclusive(exclusive);
   }
   SpdySerializedFrame frame(spdy_framer_.SerializeFrame(headers_frame));
-  headers_stream_->WriteOrBufferData(
+  headers_stream()->WriteOrBufferData(
       QuicStringPiece(frame.data(), frame.size()), false,
       std::move(ack_listener));
   return frame.size();
@@ -695,8 +709,20 @@
 }
 
 bool QuicSpdySession::HasActiveRequestStreams() const {
-  // TODO(renjietang): Exclude static streams.
-  return !dynamic_streams().empty();
+  if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+    return !dynamic_streams().empty();
+  }
+  // In the case where session is destructed by calling
+  // dynamic_streams().clear(), we will have incorrect accounting here.
+  // TODO(renjietang): Modify destructors and make this a DCHECK.
+  QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 9, 9);
+  if (static_cast<size_t>(dynamic_streams().size()) >
+      num_incoming_static_streams() + num_outgoing_static_streams()) {
+    return dynamic_streams().size() - num_incoming_static_streams() -
+               num_outgoing_static_streams() >
+           0;
+  }
+  return false;
 }
 
 }  // namespace quic
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 0ff1150..9248fa5 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -139,7 +139,17 @@
 
   QpackEncoder* qpack_encoder();
   QpackDecoder* qpack_decoder();
-  QuicHeadersStream* headers_stream() { return headers_stream_.get(); }
+  QuicHeadersStream* headers_stream() {
+    return GetQuicReloadableFlag(quic_eliminate_static_stream_map)
+               ? unowned_headers_stream_
+               : headers_stream_.get();
+  }
+
+  const QuicHeadersStream* headers_stream() const {
+    return GetQuicReloadableFlag(quic_eliminate_static_stream_map)
+               ? unowned_headers_stream_
+               : headers_stream_.get();
+  }
 
   bool server_push_enabled() const { return server_push_enabled_; }
 
@@ -262,6 +272,13 @@
   // TODO(123528590): Remove this member.
   std::unique_ptr<QuicHeadersStream> headers_stream_;
 
+  // Unowned headers stream pointer that points to the stream
+  // in dynamic_stream_map.
+  // TODO(renjietang): Merge this with headers_stream_ and clean up other
+  // static_stream_map logic when flag eliminate_static_stream_map
+  // is deprecated.
+  QuicHeadersStream* unowned_headers_stream_;
+
   // The maximum size of a header block that will be accepted from the peer,
   // defined per spec as key + value + overhead per field (uncompressed).
   size_t max_inbound_header_list_size_;
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 04605e1..5726573 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -896,9 +896,16 @@
   // connection flow control blocked.
   TestCryptoStream* crypto_stream = session_.GetMutableCryptoStream();
   EXPECT_CALL(*crypto_stream, OnCanWrite());
-  QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
-  TestHeadersStream* headers_stream = new TestHeadersStream(&session_);
-  QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+  TestHeadersStream* headers_stream;
+  if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+    QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
+    headers_stream = new TestHeadersStream(&session_);
+    QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+  } else {
+    QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr);
+    headers_stream = new TestHeadersStream(&session_);
+    QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream);
+  }
   session_.MarkConnectionLevelWriteBlocked(
       QuicUtils::GetHeadersStreamId(connection_->transport_version()));
   EXPECT_CALL(*headers_stream, OnCanWrite());
@@ -1597,9 +1604,16 @@
 }
 
 TEST_P(QuicSpdySessionTestClient, WritePriority) {
-  QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
-  TestHeadersStream* headers_stream = new TestHeadersStream(&session_);
-  QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+  TestHeadersStream* headers_stream;
+  if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+    QuicSpdySessionPeer::SetHeadersStream(&session_, nullptr);
+    headers_stream = new TestHeadersStream(&session_);
+    QuicSpdySessionPeer::SetHeadersStream(&session_, headers_stream);
+  } else {
+    QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, nullptr);
+    headers_stream = new TestHeadersStream(&session_);
+    QuicSpdySessionPeer::SetUnownedHeadersStream(&session_, headers_stream);
+  }
 
   // Make packet writer blocked so |headers_stream| will buffer its write data.
   MockPacketWriter* writer = static_cast<MockPacketWriter*>(
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 36070a4..91d323b 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -61,6 +61,8 @@
                             config_.GetMaxIncomingDynamicStreamsToSend()),
       num_dynamic_incoming_streams_(0),
       num_draining_incoming_streams_(0),
+      num_outgoing_static_streams_(0),
+      num_incoming_static_streams_(0),
       num_locally_closed_incoming_streams_highest_offset_(0),
       error_(QUIC_NO_ERROR),
       flow_controller_(
@@ -118,6 +120,20 @@
   }
 }
 
+void QuicSession::RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream) {
+  DCHECK(GetQuicReloadableFlag(quic_eliminate_static_stream_map));
+  QuicStreamId stream_id = stream->id();
+  dynamic_stream_map_[stream_id] = std::move(stream);
+  if (connection_->transport_version() == QUIC_VERSION_99) {
+    v99_streamid_manager_.RegisterStaticStream(stream_id);
+  }
+  if (IsIncomingStream(stream_id)) {
+    ++num_incoming_static_streams_;
+  } else {
+    ++num_outgoing_static_streams_;
+  }
+}
+
 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
   // TODO(rch) deal with the error case of stream id 0.
   QuicStreamId stream_id = frame.stream_id;
@@ -152,6 +168,14 @@
     }
     return;
   }
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) && frame.fin &&
+      handler.stream->is_static()) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 1, 9);
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Attempt to close a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
   handler.stream->OnStreamFrame(frame);
 }
 
@@ -230,6 +254,19 @@
                   << stream_id << ". Ignoring.";
     return true;
   }
+
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+      stream->is_static()) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 2, 9);
+    QUIC_DVLOG(1) << ENDPOINT
+                  << "Received STOP_SENDING for a static stream, id: "
+                  << stream_id << " Closing connection";
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
+
   stream->OnStopSending(frame.application_error_code);
 
   stream->set_stream_error(
@@ -277,6 +314,14 @@
     HandleRstOnValidNonexistentStream(frame);
     return;  // Errors are handled by GetOrCreateStream.
   }
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+      handler.stream->is_static()) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 3, 9);
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
   handler.stream->OnStreamReset(frame);
 }
 
@@ -317,14 +362,35 @@
     error_ = error;
   }
 
-  while (!dynamic_stream_map_.empty()) {
-    DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
-    QuicStreamId id = it->first;
-    it->second->OnConnectionClosed(error, source);
-    // The stream should call CloseStream as part of OnConnectionClosed.
-    if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
-      QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
-      CloseStream(id);
+  if (!GetQuicReloadableFlag(quic_eliminate_static_stream_map)) {
+    while (!dynamic_stream_map_.empty()) {
+      DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
+      QuicStreamId id = it->first;
+      it->second->OnConnectionClosed(error, source);
+      // The stream should call CloseStream as part of OnConnectionClosed.
+      if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
+        QUIC_BUG << ENDPOINT << "Stream " << id
+                 << " failed to close under OnConnectionClosed";
+        CloseStream(id);
+      }
+    }
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 4, 9);
+    // Copy all non static streams in a new map for the ease of deleting.
+    QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
+    for (const auto& it : dynamic_stream_map_) {
+      if (!it.second->is_static()) {
+        non_static_streams[it.first] = it.second.get();
+      }
+    }
+    for (const auto& it : non_static_streams) {
+      QuicStreamId id = it.first;
+      it.second->OnConnectionClosed(error, source);
+      if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
+        QUIC_BUG << ENDPOINT << "Stream " << id
+                 << " failed to close under OnConnectionClosed";
+        CloseStream(id);
+      }
     }
   }
 
@@ -533,7 +599,8 @@
 
 uint64_t QuicSession::GetNumOpenDynamicStreams() const {
   return dynamic_stream_map_.size() - draining_streams_.size() +
-         locally_closed_streams_highest_offset_.size();
+         locally_closed_streams_highest_offset_.size() -
+         num_incoming_static_streams_ - num_outgoing_static_streams_;
 }
 
 void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
@@ -625,6 +692,17 @@
 
   DynamicStreamMap::iterator it = dynamic_stream_map_.find(id);
   if (it != dynamic_stream_map_.end()) {
+    if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+        it->second->is_static()) {
+      QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 5, 9);
+      QUIC_DVLOG(1) << ENDPOINT
+                    << "Try to send rst for a static stream, id: " << id
+                    << " Closing connection";
+      connection()->CloseConnection(
+          QUIC_INVALID_STREAM_ID, "Sending rst for a static stream",
+          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+      return;
+    }
     QuicStream* stream = it->second.get();
     if (stream) {
       stream->set_rst_sent(true);
@@ -687,6 +765,17 @@
     return;
   }
   QuicStream* stream = it->second.get();
+  if (GetQuicReloadableFlag(quic_eliminate_static_stream_map) &&
+      stream->is_static()) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_eliminate_static_stream_map, 6, 9);
+    QUIC_DVLOG(1) << ENDPOINT
+                  << "Try to close a static stream, id: " << stream_id
+                  << " Closing connection";
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Try to close a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
 
   // Tell the stream that a RST has been sent.
   if (locally_reset) {
@@ -1024,6 +1113,7 @@
 }
 
 void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
+  DCHECK(!stream->is_static());
   QuicStreamId stream_id = stream->id();
   QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
                 << ". activating " << stream_id;
@@ -1245,7 +1335,8 @@
 }
 
 size_t QuicSession::GetNumActiveStreams() const {
-  return dynamic_stream_map_.size() - draining_streams_.size();
+  return dynamic_stream_map_.size() - draining_streams_.size() -
+         num_incoming_static_streams_ - num_outgoing_static_streams_;
 }
 
 size_t QuicSession::GetNumDrainingStreams() const {
@@ -1280,9 +1371,11 @@
 size_t QuicSession::GetNumDynamicOutgoingStreams() const {
   DCHECK_GE(static_cast<size_t>(dynamic_stream_map_.size() +
                                 pending_stream_map_.size()),
-            num_dynamic_incoming_streams_);
+            num_dynamic_incoming_streams_ + num_outgoing_static_streams_ +
+                num_incoming_static_streams_);
   return dynamic_stream_map_.size() + pending_stream_map_.size() -
-         num_dynamic_incoming_streams_;
+         num_dynamic_incoming_streams_ - num_outgoing_static_streams_ -
+         num_incoming_static_streams_;
 }
 
 size_t QuicSession::GetNumDrainingOutgoingStreams() const {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index a9fb992..d620502 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -300,6 +300,16 @@
   // reserved headers and crypto streams.
   size_t GetNumOpenOutgoingStreams() const;
 
+  // Returns the number of open peer initiated static streams.
+  size_t num_incoming_static_streams() const {
+    return num_incoming_static_streams_;
+  }
+
+  // Returns the number of open self initiated static streams.
+  size_t num_outgoing_static_streams() const {
+    return num_outgoing_static_streams_;
+  }
+
   // Add the stream to the session's write-blocked list because it is blocked by
   // connection-level flow control but not by its own stream-level flow control.
   // The stream will be given a chance to write when a connection-level
@@ -481,6 +491,9 @@
   // Register (|id|, |stream|) with the static stream map. Override previous
   // registrations with the same id.
   void RegisterStaticStream(QuicStreamId id, QuicStream* stream);
+  // TODO(renjietang): Replace the original Register method with the new one
+  // once flag is deprecated.
+  void RegisterStaticStreamNew(std::unique_ptr<QuicStream> stream);
   const StaticStreamMap& static_streams() const { return static_stream_map_; }
 
   DynamicStreamMap& dynamic_streams() { return dynamic_stream_map_; }
@@ -653,6 +666,14 @@
   // A counter for peer initiated streams which are in the draining_streams_.
   size_t num_draining_incoming_streams_;
 
+  // A counter for self initiated static streams which are in
+  // dynamic_stream_map_.
+  size_t num_outgoing_static_streams_;
+
+  // A counter for peer initiated static streams which are in
+  // dynamic_stream_map_.
+  size_t num_incoming_static_streams_;
+
   // A counter for peer initiated streams which are in the
   // locally_closed_streams_highest_offset_.
   size_t num_locally_closed_incoming_streams_highest_offset_;
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index 40bed1c..0cacbe3 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -343,6 +343,9 @@
   // Does not send a FIN.  May cause the stream to be closed.
   virtual void CloseWriteSide();
 
+  // Returns true if the stream is static.
+  bool is_static() const { return is_static_; }
+
  protected:
   // Sends as many bytes in the first |count| buffers of |iov| to the connection
   // as the connection will consume. If FIN is consumed, the write side is
diff --git a/quic/test_tools/quic_spdy_session_peer.cc b/quic/test_tools/quic_spdy_session_peer.cc
index dcba12c..956cd88 100644
--- a/quic/test_tools/quic_spdy_session_peer.cc
+++ b/quic/test_tools/quic_spdy_session_peer.cc
@@ -13,7 +13,7 @@
 // static
 QuicHeadersStream* QuicSpdySessionPeer::GetHeadersStream(
     QuicSpdySession* session) {
-  return session->headers_stream_.get();
+  return session->headers_stream();
 }
 
 // static
@@ -25,6 +25,20 @@
   }
 }
 
+void QuicSpdySessionPeer::SetUnownedHeadersStream(
+    QuicSpdySession* session,
+    QuicHeadersStream* headers_stream) {
+  for (auto& it : session->dynamic_streams()) {
+    if (it.first == QuicUtils::GetHeadersStreamId(
+                        session->connection()->transport_version())) {
+      it.second.reset(headers_stream);
+      session->unowned_headers_stream_ =
+          static_cast<QuicHeadersStream*>(it.second.get());
+      break;
+    }
+  }
+}
+
 // static
 const spdy::SpdyFramer& QuicSpdySessionPeer::GetSpdyFramer(
     QuicSpdySession* session) {
diff --git a/quic/test_tools/quic_spdy_session_peer.h b/quic/test_tools/quic_spdy_session_peer.h
index 83e8b0c..47b55e1 100644
--- a/quic/test_tools/quic_spdy_session_peer.h
+++ b/quic/test_tools/quic_spdy_session_peer.h
@@ -24,6 +24,8 @@
   static QuicHeadersStream* GetHeadersStream(QuicSpdySession* session);
   static void SetHeadersStream(QuicSpdySession* session,
                                QuicHeadersStream* headers_stream);
+  static void SetUnownedHeadersStream(QuicSpdySession* session,
+                                      QuicHeadersStream* headers_stream);
   static const spdy::SpdyFramer& GetSpdyFramer(QuicSpdySession* session);
   static void SetHpackEncoderDebugVisitor(
       QuicSpdySession* session,