Ignore read data on PendingStream after invalid type byte is received.

This bug has been discovered by Chromium's ClusterFuzz at
https://crbug.com/969391.  I locally verified that this CL fixes the crash
with that particular fuzzer input.

https://quicwg.org/base-drafts/draft-ietf-quic-transport.html#frame-stop-sending
notes that a STOP_SENDING frame communicate that "incoming data is being
discarded on receipt".  This CL adds a single line to make PendingStream
actually discard incoming data.  Before this change, PendingStream could be
tricked into reading further data bytes by retransmitting the original stream
frame, which could have resulted in creating and activating a unidirectional
stream.

gfe-relnote: n/a, change in QUIC v99 only.
PiperOrigin-RevId: 262674708
Change-Id: Icaf0e700711b7220b36e595deb95620cac1d89af
diff --git a/quic/core/http/quic_spdy_session.cc b/quic/core/http/quic_spdy_session.cc
index baa9c85..90560ad 100644
--- a/quic/core/http/quic_spdy_session.cc
+++ b/quic/core/http/quic_spdy_session.cc
@@ -50,7 +50,7 @@
 
 namespace {
 
-// TODO(renjietang): remove this once HTTP/3 error codes are adopted.
+// TODO(b/124216424): remove this once HTTP/3 error codes are adopted.
 const uint16_t kHttpUnknownStreamType = 0x0D;
 
 class HeaderTableDebugVisitor : public HpackHeaderTable::DebugVisitorInterface {
@@ -860,6 +860,12 @@
   uint8_t stream_type_length = reader.PeekVarInt62Length();
   uint64_t stream_type = 0;
   if (!reader.ReadVarInt62(&stream_type)) {
+    if (pending->sequencer()->NumBytesBuffered() ==
+        pending->sequencer()->close_offset()) {
+      // Stream received FIN but there are not enough bytes for stream type.
+      // Mark all bytes consumed in order to close stream.
+      pending->MarkConsumed(pending->sequencer()->close_offset());
+    }
     return false;
   }
   pending->MarkConsumed(stream_type_length);
@@ -886,6 +892,7 @@
       break;
     default:
       SendStopSending(kHttpUnknownStreamType, pending->id());
+      pending->StopReading();
   }
   return false;
 }
diff --git a/quic/core/http/quic_spdy_session_test.cc b/quic/core/http/quic_spdy_session_test.cc
index 6a0b96e..a9bcfcc 100644
--- a/quic/core/http/quic_spdy_session_test.cc
+++ b/quic/core/http/quic_spdy_session_test.cc
@@ -17,6 +17,7 @@
 #include "net/third_party/quiche/src/quic/core/quic_packets.h"
 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_arraysize.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_expect_bug.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
@@ -55,6 +56,11 @@
 namespace test {
 namespace {
 
+bool VerifyAndClearStopSendingFrame(const QuicFrame& frame) {
+  EXPECT_EQ(STOP_SENDING_FRAME, frame.type);
+  return ClearControlFrame(frame);
+}
+
 class TestCryptoStream : public QuicCryptoStream, public QuicCryptoHandshaker {
  public:
   explicit TestCryptoStream(QuicSession* session)
@@ -2052,41 +2058,76 @@
   if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
-                            transport_version(), Perspective::IS_CLIENT),
-                        &session_);
-  char input[] = {// type
-                  0x04,
-                  // data
-                  'a', 'b', 'c'};
-  QuicStreamFrame data(pending.id(), true, 0, QuicStringPiece(input, 4));
-  pending.OnStreamFrame(data);
 
-  // A stop sending frame will be sent to indicate unknown type.
-  EXPECT_CALL(*connection_, SendControlFrame(_));
-  session_.ProcessPendingStream(&pending);
+  char input[] = {0x04,            // type
+                  'a', 'b', 'c'};  // data
+  QuicStringPiece payload(input, QUIC_ARRAYSIZE(input));
+
+  // This is a server test with a client-initiated unidirectional stream.
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+
+  for (bool fin : {true, false}) {
+    QuicStreamFrame frame(stream_id, fin, /* offset = */ 0, payload);
+
+    // A STOP_SENDING frame is sent in response to the unknown stream type.
+    EXPECT_CALL(*connection_, SendControlFrame(_))
+        .WillOnce(Invoke(&VerifyAndClearStopSendingFrame));
+    session_.OnStreamFrame(frame);
+
+    PendingStream* pending =
+        QuicSessionPeer::GetPendingStream(&session_, stream_id);
+    if (fin) {
+      // Stream is closed if FIN is received.
+      EXPECT_FALSE(pending);
+    } else {
+      ASSERT_TRUE(pending);
+      // The pending stream must ignore read data.
+      EXPECT_TRUE(pending->sequencer()->ignore_read_data());
+    }
+
+    stream_id += QuicUtils::StreamIdDelta(transport_version());
+  }
 }
 
 TEST_P(QuicSpdySessionTestServer, SimplePendingStreamTypeOutOfOrderDelivery) {
   if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
-                            transport_version(), Perspective::IS_CLIENT),
-                        &session_);
-  char input[] = {// type
-                  0x04,
-                  // data
-                  'a', 'b', 'c'};
-  QuicStreamFrame data1(pending.id(), true, 1, QuicStringPiece(&input[1], 3));
-  pending.OnStreamFrame(data1);
-  session_.ProcessPendingStream(&pending);
 
-  QuicStreamFrame data2(pending.id(), false, 0, QuicStringPiece(input, 1));
-  pending.OnStreamFrame(data2);
+  char input[] = {0x04,            // type
+                  'a', 'b', 'c'};  // data
+  QuicStringPiece payload(input, QUIC_ARRAYSIZE(input));
 
-  EXPECT_CALL(*connection_, SendControlFrame(_));
-  session_.ProcessPendingStream(&pending);
+  // This is a server test with a client-initiated unidirectional stream.
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+
+  for (bool fin : {true, false}) {
+    QuicStreamFrame frame1(stream_id, /* fin = */ false, /* offset = */ 0,
+                           payload.substr(0, 1));
+    QuicStreamFrame frame2(stream_id, fin, /* offset = */ 1, payload.substr(1));
+
+    // Deliver frames out of order.
+    session_.OnStreamFrame(frame2);
+    // A STOP_SENDING frame is sent in response to the unknown stream type.
+    EXPECT_CALL(*connection_, SendControlFrame(_))
+        .WillOnce(Invoke(&VerifyAndClearStopSendingFrame));
+    session_.OnStreamFrame(frame1);
+
+    PendingStream* pending =
+        QuicSessionPeer::GetPendingStream(&session_, stream_id);
+    if (fin) {
+      // Stream is closed if FIN is received.
+      EXPECT_FALSE(pending);
+    } else {
+      ASSERT_TRUE(pending);
+      // The pending stream must ignore read data.
+      EXPECT_TRUE(pending->sequencer()->ignore_read_data());
+    }
+
+    stream_id += QuicUtils::StreamIdDelta(transport_version());
+  }
 }
 
 TEST_P(QuicSpdySessionTestServer,
@@ -2094,27 +2135,44 @@
   if (!VersionHasStreamType(transport_version())) {
     return;
   }
-  PendingStream pending(QuicUtils::GetFirstUnidirectionalStreamId(
-                            transport_version(), Perspective::IS_CLIENT),
-                        &session_);
-  char input[] = {// type (256)
-                  0x40 + 0x01, 0x00,
-                  // data
-                  'a', 'b', 'c'};
 
-  QuicStreamFrame data1(pending.id(), true, 2, QuicStringPiece(&input[2], 3));
-  pending.OnStreamFrame(data1);
-  session_.ProcessPendingStream(&pending);
+  char input[] = {0x41, 0x00,      // type (256)
+                  'a', 'b', 'c'};  // data
+  QuicStringPiece payload(input, QUIC_ARRAYSIZE(input));
 
-  QuicStreamFrame data2(pending.id(), false, 0, QuicStringPiece(input, 1));
-  pending.OnStreamFrame(data2);
-  session_.ProcessPendingStream(&pending);
+  // This is a server test with a client-initiated unidirectional stream.
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
 
-  QuicStreamFrame data3(pending.id(), false, 1, QuicStringPiece(&input[1], 1));
-  pending.OnStreamFrame(data3);
+  for (bool fin : {true, false}) {
+    QuicStreamFrame frame1(stream_id, /* fin = */ false, /* offset = */ 0,
+                           payload.substr(0, 1));
+    QuicStreamFrame frame2(stream_id, /* fin = */ false, /* offset = */ 1,
+                           payload.substr(1, 1));
+    QuicStreamFrame frame3(stream_id, fin, /* offset = */ 2, payload.substr(2));
 
-  EXPECT_CALL(*connection_, SendControlFrame(_));
-  session_.ProcessPendingStream(&pending);
+    // Deliver frames out of order.
+    session_.OnStreamFrame(frame3);
+    // The first byte does not contain the entire type varint.
+    session_.OnStreamFrame(frame1);
+    // A STOP_SENDING frame is sent in response to the unknown stream type.
+    EXPECT_CALL(*connection_, SendControlFrame(_))
+        .WillOnce(Invoke(&VerifyAndClearStopSendingFrame));
+    session_.OnStreamFrame(frame2);
+
+    PendingStream* pending =
+        QuicSessionPeer::GetPendingStream(&session_, stream_id);
+    if (fin) {
+      // Stream is closed if FIN is received.
+      EXPECT_FALSE(pending);
+    } else {
+      ASSERT_TRUE(pending);
+      // The pending stream must ignore read data.
+      EXPECT_TRUE(pending->sequencer()->ignore_read_data());
+    }
+
+    stream_id += QuicUtils::StreamIdDelta(transport_version());
+  }
 }
 
 TEST_P(QuicSpdySessionTestServer, ReceiveControlStream) {
@@ -2177,6 +2235,148 @@
   EXPECT_EQ(5u, session_.max_outbound_header_list_size());
 }
 
+TEST_P(QuicSpdySessionTestClient, ResetAfterInvalidIncomingStreamType) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+  ASSERT_TRUE(session_.UsesPendingStreams());
+
+  const QuicStreamId stream_id =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+
+  // Payload consists of two bytes.  The first byte is an unknown unidirectional
+  // stream type.  The second one would be the type of a push stream, but it
+  // must not be interpreted as stream type.
+  std::string payload = QuicTextUtils::HexDecode("3f01");
+  QuicStreamFrame frame(stream_id, /* fin = */ false, /* offset = */ 0,
+                        payload);
+
+  // A STOP_SENDING frame is sent in response to the unknown stream type.
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillOnce(Invoke(&VerifyAndClearStopSendingFrame));
+  session_.OnStreamFrame(frame);
+
+  // There are no active streams.
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+
+  // The pending stream is still around, because it did not receive a FIN.
+  PendingStream* pending =
+      QuicSessionPeer::GetPendingStream(&session_, stream_id);
+  ASSERT_TRUE(pending);
+
+  // The pending stream must ignore read data.
+  EXPECT_TRUE(pending->sequencer()->ignore_read_data());
+
+  // If the stream frame is received again, it should be ignored.
+  session_.OnStreamFrame(frame);
+
+  // Receive RESET_STREAM.
+  QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_id,
+                               QUIC_STREAM_CANCELLED,
+                               /* bytes_written = */ payload.size());
+
+  // This will trigger the sending of two control frames: one RESET_STREAM with
+  // QUIC_RST_ACKNOWLEDGEMENT, and one STOP_SENDING.
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .Times(2)
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_, OnStreamReset(stream_id, QUIC_RST_ACKNOWLEDGEMENT));
+  session_.OnRstStream(rst_frame);
+
+  // The stream is closed.
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+}
+
+TEST_P(QuicSpdySessionTestClient, FinAfterInvalidIncomingStreamType) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+  ASSERT_TRUE(session_.UsesPendingStreams());
+
+  const QuicStreamId stream_id =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+
+  // Payload consists of two bytes.  The first byte is an unknown unidirectional
+  // stream type.  The second one would be the type of a push stream, but it
+  // must not be interpreted as stream type.
+  std::string payload = QuicTextUtils::HexDecode("3f01");
+  QuicStreamFrame frame(stream_id, /* fin = */ false, /* offset = */ 0,
+                        payload);
+
+  // A STOP_SENDING frame is sent in response to the unknown stream type.
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .WillOnce(Invoke(&VerifyAndClearStopSendingFrame));
+  session_.OnStreamFrame(frame);
+
+  // The pending stream is still around, because it did not receive a FIN.
+  PendingStream* pending =
+      QuicSessionPeer::GetPendingStream(&session_, stream_id);
+  EXPECT_TRUE(pending);
+
+  // The pending stream must ignore read data.
+  EXPECT_TRUE(pending->sequencer()->ignore_read_data());
+
+  // If the stream frame is received again, it should be ignored.
+  session_.OnStreamFrame(frame);
+
+  // Receive FIN.
+  session_.OnStreamFrame(QuicStreamFrame(stream_id, /* fin = */ true,
+                                         /* offset = */ payload.size(), ""));
+
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+}
+
+TEST_P(QuicSpdySessionTestClient, ResetInMiddleOfStreamType) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+  ASSERT_TRUE(session_.UsesPendingStreams());
+
+  const QuicStreamId stream_id =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+
+  // Payload is the first byte of a two byte varint encoding.
+  std::string payload = QuicTextUtils::HexDecode("40");
+  QuicStreamFrame frame(stream_id, /* fin = */ false, /* offset = */ 0,
+                        payload);
+
+  session_.OnStreamFrame(frame);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+
+  // Receive RESET_STREAM.
+  QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_id,
+                               QUIC_STREAM_CANCELLED,
+                               /* bytes_written = */ payload.size());
+
+  // This will trigger the sending of two control frames: one RESET_STREAM with
+  // QUIC_RST_ACKNOWLEDGEMENT, and one STOP_SENDING.
+  EXPECT_CALL(*connection_, SendControlFrame(_))
+      .Times(2)
+      .WillRepeatedly(Invoke(&ClearControlFrame));
+  EXPECT_CALL(*connection_, OnStreamReset(stream_id, QUIC_RST_ACKNOWLEDGEMENT));
+  session_.OnRstStream(rst_frame);
+
+  // The stream is closed.
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+}
+
+TEST_P(QuicSpdySessionTestClient, FinInMiddleOfStreamType) {
+  if (!VersionHasStreamType(transport_version())) {
+    return;
+  }
+  ASSERT_TRUE(session_.UsesPendingStreams());
+
+  const QuicStreamId stream_id =
+      GetNthServerInitiatedUnidirectionalStreamId(transport_version(), 0);
+
+  // Payload is the first byte of a two byte varint encoding with a FIN.
+  std::string payload = QuicTextUtils::HexDecode("40");
+  QuicStreamFrame frame(stream_id, /* fin = */ true, /* offset = */ 0, payload);
+
+  session_.OnStreamFrame(frame);
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 875ec22..ac4ac9b 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -157,15 +157,15 @@
   if (!connection()->connected()) {
     return;
   }
-  if (pending->sequencer()->IsClosed()) {
-    ClosePendingStream(stream_id);
-    return;
-  }
   if (ProcessPendingStream(pending)) {
     // The pending stream should now be in the scope of normal streams.
     DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
         << "Stream " << stream_id << " not created";
     pending_stream_map_.erase(stream_id);
+    return;
+  }
+  if (pending->sequencer()->IsClosed()) {
+    ClosePendingStream(stream_id);
   }
 }
 
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 4d460d3..eb8b96b 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -173,6 +173,11 @@
   sequencer_.MarkConsumed(num_bytes);
 }
 
+void PendingStream::StopReading() {
+  QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
+  sequencer_.StopReading();
+}
+
 QuicStream::QuicStream(PendingStream* pending, StreamType type, bool is_static)
     : QuicStream(pending->id_,
                  pending->session_,
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index b747451..c8e5012 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -78,6 +78,10 @@
 
   void MarkConsumed(size_t num_bytes);
 
+  // Tells the sequencer to ignore all incoming data itself and not call
+  // OnDataAvailable().
+  void StopReading();
+
  private:
   friend class QuicStream;