Separate (1) feeding frame to pending stream & (2) processing pending stream in QuicSession::PendingStreamOnStream to different functions controlled by different knobs (ShouldProcessFrameByPendingStream & ShouldProcessPendingStreamImmediately).

In followup, knobs in (1) & (2) can be used to buffer stream data until post-tls handshake completes.

PiperOrigin-RevId: 398791682
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index 4dcfe01..06531a5 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -890,11 +890,15 @@
   return GetNumActiveStreams() + pending_streams_size() > 0;
 }
 
-bool QuicSpdySession::UsesPendingStreams() const {
-  // QuicSpdySession supports PendingStreams, therefore this method should
-  // eventually just return true.  However, pending streams can only be used if
-  // unidirectional stream type is supported.
-  return VersionUsesHttp3(transport_version());
+bool QuicSpdySession::UsesPendingStreamForFrame(QuicFrameType type,
+                                                QuicStreamId stream_id) const {
+  // Pending streams can only be used to handle unidirectional stream with
+  // STREAM & RESET_STREAM frames in IETF QUIC.
+  return VersionUsesHttp3(transport_version()) &&
+         (type == STREAM_FRAME || type == RST_STREAM_FRAME) &&
+         QuicUtils::GetStreamType(stream_id, perspective(),
+                                  IsIncomingStream(stream_id),
+                                  version()) == READ_UNIDIRECTIONAL;
 }
 
 size_t QuicSpdySession::WriteHeadersOnHeadersStreamImpl(
diff --git a/quic/core/http/quic_spdy_session.h b/quic/core/http/quic_spdy_session.h
index 8085dd8..bb22f17 100644
--- a/quic/core/http/quic_spdy_session.h
+++ b/quic/core/http/quic_spdy_session.h
@@ -478,7 +478,8 @@
   bool ShouldKeepConnectionAlive() const override;
 
   // Overridden to buffer incoming unidirectional streams for version 99.
-  bool UsesPendingStreams() const override;
+  bool UsesPendingStreamForFrame(QuicFrameType type,
+                                 QuicStreamId stream_id) const override;
 
   // Processes incoming unidirectional streams; parses the stream type, and
   // creates a new stream of the corresponding type.  Returns the pointer to the
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 2719735..80296d6 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -361,7 +361,7 @@
   using QuicSession::closed_streams;
   using QuicSession::ShouldKeepConnectionAlive;
   using QuicSpdySession::ProcessPendingStream;
-  using QuicSpdySession::UsesPendingStreams;
+  using QuicSpdySession::UsesPendingStreamForFrame;
 
  private:
   StrictMock<TestCryptoStream> crypto_stream_;
@@ -607,11 +607,25 @@
                          ::testing::ValuesIn(AllSupportedVersions()),
                          ::testing::PrintToStringParamName());
 
-TEST_P(QuicSpdySessionTestServer, UsesPendingStreams) {
+TEST_P(QuicSpdySessionTestServer, UsesPendingStreamsForFrame) {
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
-  EXPECT_TRUE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreamForFrame(
+      STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                        transport_version(), Perspective::IS_CLIENT)));
+  EXPECT_TRUE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                            transport_version(), Perspective::IS_CLIENT)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                            transport_version(), Perspective::IS_SERVER)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      STOP_SENDING_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                              transport_version(), Perspective::IS_CLIENT)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstBidirectionalStreamId(
+                            transport_version(), Perspective::IS_CLIENT)));
 }
 
 TEST_P(QuicSpdySessionTestServer, PeerAddress) {
@@ -1840,11 +1854,25 @@
                          ::testing::ValuesIn(AllSupportedVersions()),
                          ::testing::PrintToStringParamName());
 
-TEST_P(QuicSpdySessionTestClient, UsesPendingStreams) {
+TEST_P(QuicSpdySessionTestClient, UsesPendingStreamsForFrame) {
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
-  EXPECT_TRUE(session_.UsesPendingStreams());
+  EXPECT_TRUE(session_.UsesPendingStreamForFrame(
+      STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                        transport_version(), Perspective::IS_SERVER)));
+  EXPECT_TRUE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                            transport_version(), Perspective::IS_SERVER)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                            transport_version(), Perspective::IS_CLIENT)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      STOP_SENDING_FRAME, QuicUtils::GetFirstUnidirectionalStreamId(
+                              transport_version(), Perspective::IS_SERVER)));
+  EXPECT_FALSE(session_.UsesPendingStreamForFrame(
+      RST_STREAM_FRAME, QuicUtils::GetFirstBidirectionalStreamId(
+                            transport_version(), Perspective::IS_SERVER)));
 }
 
 // Regression test for crbug.com/977581.
@@ -2546,10 +2574,10 @@
     return;
   }
   CompleteHandshake();
-  ASSERT_TRUE(session_.UsesPendingStreams());
 
   const QuicStreamId stream_id =
       GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  ASSERT_TRUE(session_.UsesPendingStreamForFrame(STREAM_FRAME, stream_id));
 
   // Payload consists of two bytes.  The first byte is an unknown unidirectional
   // stream type.  The second one would be the type of a push stream, but it
@@ -2593,10 +2621,10 @@
     return;
   }
   CompleteHandshake();
-  ASSERT_TRUE(session_.UsesPendingStreams());
 
   const QuicStreamId stream_id =
       GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  ASSERT_TRUE(session_.UsesPendingStreamForFrame(STREAM_FRAME, stream_id));
 
   // Payload consists of two bytes.  The first byte is an unknown unidirectional
   // stream type.  The second one would be the type of a push stream, but it
@@ -2632,10 +2660,10 @@
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
-  ASSERT_TRUE(session_.UsesPendingStreams());
 
   const QuicStreamId stream_id =
       GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  ASSERT_TRUE(session_.UsesPendingStreamForFrame(STREAM_FRAME, stream_id));
 
   // Payload is the first byte of a two byte varint encoding.
   std::string payload = absl::HexStringToBytes("40");
@@ -2660,10 +2688,10 @@
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
-  ASSERT_TRUE(session_.UsesPendingStreams());
 
   const QuicStreamId stream_id =
       GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+  ASSERT_TRUE(session_.UsesPendingStreamForFrame(STREAM_FRAME, stream_id));
 
   // Payload is the first byte of a two byte varint encoding with a FIN.
   std::string payload = absl::HexStringToBytes("40");
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index e720221..e35e1a3 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -164,7 +164,8 @@
   }
 }
 
-void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
+PendingStream* QuicSession::PendingStreamOnStreamFrame(
+    const QuicStreamFrame& frame) {
   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
   QuicStreamId stream_id = frame.stream_id;
 
@@ -175,14 +176,20 @@
       QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
       OnFinalByteOffsetReceived(stream_id, final_byte_offset);
     }
-    return;
+    return nullptr;
   }
 
   pending->OnStreamFrame(frame);
   if (!connection()->connected()) {
-    return;
+    return nullptr;
   }
+  return pending;
+}
+
+void QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
+  QUICHE_DCHECK(pending != nullptr);
   QuicStream* stream = ProcessPendingStream(pending);
+  QuicStreamId stream_id = pending->id();
   if (stream != nullptr) {
     // The pending stream should now be in the scope of normal streams.
     QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
@@ -205,12 +212,11 @@
     return;
   }
 
-  if (UsesPendingStreams() &&
-      QuicUtils::GetStreamType(stream_id, perspective(),
-                               IsIncomingStream(stream_id),
-                               version()) == READ_UNIDIRECTIONAL &&
-      stream_map_.find(stream_id) == stream_map_.end()) {
-    PendingStreamOnStreamFrame(frame);
+  if (ShouldProcessFrameByPendingStream(STREAM_FRAME, stream_id)) {
+    PendingStream* pending = PendingStreamOnStreamFrame(frame);
+    if (pending != nullptr && ShouldProcessPendingStreamImmediately()) {
+      MaybeProcessPendingStream(pending);
+    }
     return;
   }
 
@@ -347,11 +353,7 @@
     visitor_->OnRstStreamReceived(frame);
   }
 
-  if (UsesPendingStreams() &&
-      QuicUtils::GetStreamType(stream_id, perspective(),
-                               IsIncomingStream(stream_id),
-                               version()) == READ_UNIDIRECTIONAL &&
-      stream_map_.find(stream_id) == stream_map_.end()) {
+  if (ShouldProcessFrameByPendingStream(RST_STREAM_FRAME, stream_id)) {
     PendingStreamOnRstStream(frame);
     return;
   }
@@ -1032,6 +1034,12 @@
   }
 }
 
+bool QuicSession::ShouldProcessFrameByPendingStream(QuicFrameType type,
+                                                    QuicStreamId id) const {
+  return UsesPendingStreamForFrame(type, id) &&
+         stream_map_.find(id) == stream_map_.end();
+}
+
 void QuicSession::OnFinalByteOffsetReceived(
     QuicStreamId stream_id, QuicStreamOffset final_byte_offset) {
   auto it = locally_closed_streams_highest_offset_.find(stream_id);
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index d9de347..9e3762a 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -672,11 +672,17 @@
   virtual void OnFinalByteOffsetReceived(QuicStreamId id,
                                          QuicStreamOffset final_byte_offset);
 
-  // Returns true if incoming unidirectional streams should be buffered until
-  // the first byte of the stream arrives.
-  // If a subclass returns true here, it should make sure to implement
-  // ProcessPendingStream().
-  virtual bool UsesPendingStreams() const { return false; }
+  // Returns true if a frame with the given type and id can be prcoessed by a
+  // PendingStream. However, the frame will always be processed by a QuicStream
+  // if one exists with the given stream_id.
+  virtual bool UsesPendingStreamForFrame(QuicFrameType /*type*/,
+                                         QuicStreamId /*stream_id*/) const {
+    return false;
+  }
+
+  // Returns true if a pending stream should be converted to a real stream after
+  // a corresponding STREAM_FRAME is received.
+  virtual bool ShouldProcessPendingStreamImmediately() const { return true; }
 
   spdy::SpdyPriority GetSpdyPriorityofStream(QuicStreamId stream_id) const {
     return write_blocked_streams_.GetSpdyPriorityofStream(stream_id);
@@ -834,9 +840,17 @@
   // Closes the pending stream |stream_id| before it has been created.
   void ClosePendingStream(QuicStreamId stream_id);
 
-  // Creates or gets pending stream, feeds it with |frame|, and processes the
-  // pending stream.
-  void PendingStreamOnStreamFrame(const QuicStreamFrame& frame);
+  // Whether the frame with given type and id should be feed to a pending
+  // stream.
+  bool ShouldProcessFrameByPendingStream(QuicFrameType type,
+                                         QuicStreamId id) const;
+
+  // Process the pending stream if possible.
+  void MaybeProcessPendingStream(PendingStream* pending);
+
+  // Creates or gets pending stream, feeds it with |frame|, and returns the
+  // pending stream. Can return NULL, e.g., if the stream ID is invalid.
+  PendingStream* PendingStreamOnStreamFrame(const QuicStreamFrame& frame);
 
   // Creates or gets pending strea, feed it with |frame|, and closes the pending
   // stream.
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 11b3e99..19df044 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -23,6 +23,7 @@
 #include "quic/core/quic_data_writer.h"
 #include "quic/core/quic_packets.h"
 #include "quic/core/quic_stream.h"
+#include "quic/core/quic_types.h"
 #include "quic/core/quic_utils.h"
 #include "quic/core/quic_versions.h"
 #include "quic/platform/api/quic_expect_bug.h"
@@ -363,7 +364,22 @@
                       GetEncryptionLevelToSendApplicationData());
   }
 
-  bool UsesPendingStreams() const override { return uses_pending_streams_; }
+  bool UsesPendingStreamForFrame(QuicFrameType type,
+                                 QuicStreamId stream_id) const override {
+    if (!uses_pending_streams_) {
+      return false;
+    }
+    StreamType stream_type = QuicUtils::GetStreamType(
+        stream_id, perspective(), IsIncomingStream(stream_id), version());
+    switch (type) {
+      case STREAM_FRAME:
+        ABSL_FALLTHROUGH_INTENDED;
+      case RST_STREAM_FRAME:
+        return stream_type == READ_UNIDIRECTIONAL;
+      default:
+        return false;
+    }
+  }
 
   void set_uses_pending_streams(bool uses_pending_streams) {
     uses_pending_streams_ = uses_pending_streams;