Refactor QuicSession to allow subclasses to do their own thing on pending streams.

gfe-relnote: code refactor and v99 only. Not used in production.
PiperOrigin-RevId: 248015584
Change-Id: Ie0ac855070b304bd90a8e68392cff3cf4c5ac52a
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 8bdf3b5..0545fa6 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -149,6 +149,24 @@
   }
 }
 
+void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
+  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  QuicStreamId stream_id = frame.stream_id;
+
+  PendingStream* pending = GetOrCreatePendingStream(stream_id);
+
+  if (!pending) {
+    if (frame.fin) {
+      QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
+      OnFinalByteOffsetReceived(stream_id, final_byte_offset);
+    }
+    return;
+  }
+
+  pending->OnStreamFrame(frame);
+  ProcessPendingStream(pending);
+}
+
 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
   // TODO(rch) deal with the error case of stream id 0.
   QuicStreamId stream_id = frame.stream_id;
@@ -167,12 +185,18 @@
     return;
   }
 
-  StreamHandler handler = GetOrCreateStreamImpl(stream_id, frame.offset != 0);
-  if (handler.is_pending) {
-    handler.pending->OnStreamFrame(frame);
+  if (VersionHasControlStreams(connection()->transport_version()) &&
+      UsesPendingStreams() &&
+      QuicUtils::GetStreamType(stream_id, perspective(),
+                               IsIncomingStream(stream_id)) ==
+          READ_UNIDIRECTIONAL &&
+      dynamic_stream_map_.find(stream_id) == dynamic_stream_map_.end()) {
+    PendingStreamOnStreamFrame(frame);
     return;
   }
 
+  StreamHandler handler = GetOrCreateStreamImpl(stream_id);
+
   if (!handler.stream) {
     // The stream no longer exists, but we may still be interested in the
     // final stream byte offset sent by the peer. A frame with a FIN can give
@@ -296,6 +320,21 @@
   return true;
 }
 
+void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
+  DCHECK(VersionHasControlStreams(connection()->transport_version()));
+  QuicStreamId stream_id = frame.stream_id;
+
+  PendingStream* pending = GetOrCreatePendingStream(stream_id);
+
+  if (!pending) {
+    HandleRstOnValidNonexistentStream(frame);
+    return;
+  }
+
+  pending->OnRstStreamFrame(frame);
+  ClosePendingStream(stream_id);
+}
+
 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
   QuicStreamId stream_id = frame.stream_id;
   if (stream_id ==
@@ -317,15 +356,18 @@
     visitor_->OnRstStreamReceived(frame);
   }
 
-  // may_buffer is true here to allow subclasses to buffer streams until the
-  // first byte of payload arrives which would allow sessions to delay
-  // creation of the stream until the type is known.
-  StreamHandler handler = GetOrCreateStreamImpl(stream_id, /*may_buffer=*/true);
-  if (handler.is_pending) {
-    handler.pending->OnRstStreamFrame(frame);
-    ClosePendingStream(stream_id);
+  if (VersionHasControlStreams(connection()->transport_version()) &&
+      UsesPendingStreams() &&
+      QuicUtils::GetStreamType(stream_id, perspective(),
+                               IsIncomingStream(stream_id)) ==
+          READ_UNIDIRECTIONAL &&
+      dynamic_stream_map_.find(stream_id) == dynamic_stream_map_.end()) {
+    PendingStreamOnRstStream(frame);
     return;
   }
+
+  StreamHandler handler = GetOrCreateStreamImpl(stream_id);
+
   if (!handler.stream) {
     HandleRstOnValidNonexistentStream(frame);
     return;  // Errors are handled by GetOrCreateStream.
@@ -1200,15 +1242,13 @@
 }
 
 QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
-  StreamHandler handler =
-      GetOrCreateStreamImpl(stream_id, /*may_buffer=*/false);
+  StreamHandler handler = GetOrCreateStreamImpl(stream_id);
   DCHECK(!handler.is_pending);
   return handler.stream;
 }
 
 QuicSession::StreamHandler QuicSession::GetOrCreateStreamImpl(
-    QuicStreamId stream_id,
-    bool may_buffer) {
+    QuicStreamId stream_id) {
   if (eliminate_static_stream_map_ &&
       stream_id ==
           QuicUtils::GetCryptoStreamId(connection_->transport_version())) {
@@ -1219,7 +1259,7 @@
   if (it != static_stream_map_.end()) {
     return StreamHandler(it->second);
   }
-  return GetOrCreateDynamicStreamImpl(stream_id, may_buffer);
+  return GetOrCreateDynamicStreamImpl(stream_id);
 }
 
 void QuicSession::StreamDraining(QuicStreamId stream_id) {
@@ -1254,17 +1294,32 @@
   return write_blocked_streams()->ShouldYield(stream_id);
 }
 
+PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
+  auto it = pending_stream_map_.find(stream_id);
+  if (it != pending_stream_map_.end()) {
+    return it->second.get();
+  }
+
+  if (IsClosedStream(stream_id) ||
+      !MaybeIncreaseLargestPeerStreamId(stream_id)) {
+    return nullptr;
+  }
+
+  auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
+  PendingStream* unowned_pending = pending.get();
+  pending_stream_map_[stream_id] = std::move(pending);
+  return unowned_pending;
+}
+
 QuicStream* QuicSession::GetOrCreateDynamicStream(
     const QuicStreamId stream_id) {
-  StreamHandler handler =
-      GetOrCreateDynamicStreamImpl(stream_id, /*may_buffer=*/false);
+  StreamHandler handler = GetOrCreateDynamicStreamImpl(stream_id);
   DCHECK(!handler.is_pending);
   return handler.stream;
 }
 
 QuicSession::StreamHandler QuicSession::GetOrCreateDynamicStreamImpl(
-    QuicStreamId stream_id,
-    bool may_buffer) {
+    QuicStreamId stream_id) {
   DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
       << "Attempt to call GetOrCreateDynamicStream for a static stream";
 
@@ -1282,21 +1337,6 @@
     return StreamHandler();
   }
 
-  auto pending_it = pending_stream_map_.find(stream_id);
-  if (pending_it != pending_stream_map_.end()) {
-    DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
-    if (may_buffer) {
-      return StreamHandler(pending_it->second.get());
-    }
-    // The stream limit accounting has already been taken care of
-    // when the PendingStream was created, so there is no need to
-    // do so here. Now we can create the actual stream from the
-    // PendingStream.
-    StreamHandler handler(CreateIncomingStream(std::move(*pending_it->second)));
-    pending_stream_map_.erase(pending_it);
-    return handler;
-  }
-
   // TODO(fkastenholz): If we are creating a new stream and we have
   // sent a goaway, we should ignore the stream creation. Need to
   // add code to A) test if goaway was sent ("if (goaway_sent_)") and
@@ -1317,18 +1357,6 @@
     }
   }
 
-  if (connection_->transport_version() == QUIC_VERSION_99 && may_buffer &&
-      ShouldBufferIncomingStream(stream_id)) {
-    ++num_dynamic_incoming_streams_;
-    // Since STREAM frames may arrive out of order, delay creating the
-    // stream object until the first byte arrives. Buffer the frames and
-    // handle flow control accounting in the PendingStream.
-    auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
-    StreamHandler handler(pending.get());
-    pending_stream_map_[stream_id] = std::move(pending);
-    return handler;
-  }
-
   return StreamHandler(CreateIncomingStream(stream_id));
 }