Handle closing pending streams.

Make QuicSession handle if a pending stream is closed before it gets converted
into the appropriate incoming stream subclass.

gfe-relnote: n/a, no functional change outside QUIC v99-only code.  Protected by existing disabled gfe2_reloadable_flag_quic_enable_version_99.
PiperOrigin-RevId: 262183753
Change-Id: I4147320860b867df0d80ba8fd55106bf779b15b3
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index a5a35f7..875ec22 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -157,6 +157,10 @@
   if (!connection()->connected()) {
     return;
   }
+  if (pending->sequencer()->IsClosed()) {
+    ClosePendingStream(stream_id);
+    return;
+  }
   if (ProcessPendingStream(pending)) {
     // The pending stream should now be in the scope of normal streams.
     DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
@@ -318,6 +322,7 @@
   }
 
   pending->OnRstStreamFrame(frame);
+  SendRstStream(stream_id, QUIC_RST_ACKNOWLEDGEMENT, 0);
   ClosePendingStream(stream_id);
 }
 
@@ -884,19 +889,7 @@
 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
 
-  if (pending_stream_map_.find(stream_id) == pending_stream_map_.end()) {
-    QUIC_BUG << ENDPOINT << "Stream is already closed: " << stream_id;
-    return;
-  }
-
-  SendRstStream(stream_id, QUIC_RST_ACKNOWLEDGEMENT, 0);
-
-  // The pending stream may have been deleted and removed during SendRstStream.
-  // Remove the stream from pending stream map iff it is still in the map.
-  if (pending_stream_map_.find(stream_id) != pending_stream_map_.end()) {
-    pending_stream_map_.erase(stream_id);
-  }
-
+  pending_stream_map_.erase(stream_id);
   if (VersionHasIetfQuicFrames(connection_->transport_version())) {
     v99_streamid_manager_.OnStreamClosed(stream_id);
   }
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 267208f..93596f2 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -1803,10 +1803,12 @@
       transport_version(), Perspective::IS_CLIENT);
   QuicStreamFrame data1(stream_id, true, 10, QuicStringPiece("HT"));
   session_.OnStreamFrame(data1);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
 
   QuicStreamFrame data2(stream_id, false, 0, QuicStringPiece("HT"));
   session_.OnStreamFrame(data2);
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(1, session_.num_incoming_streams_created());
 }
 
@@ -1820,6 +1822,7 @@
       transport_version(), Perspective::IS_CLIENT);
   QuicStreamFrame data1(stream_id, true, 10, QuicStringPiece("HT"));
   session_.OnStreamFrame(data1);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
 
@@ -1829,11 +1832,29 @@
   QuicRstStreamFrame rst1(kInvalidControlFrameId, stream_id,
                           QUIC_ERROR_PROCESSING_STREAM, 12);
   session_.OnRstStream(rst1);
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
 
   QuicStreamFrame data2(stream_id, false, 0, QuicStringPiece("HT"));
   session_.OnStreamFrame(data2);
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+  EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
+}
+
+TEST_P(QuicSessionTestServer, OnFinPendingStreams) {
+  if (!VersionHasIetfQuicFrames(transport_version())) {
+    return;
+  }
+  session_.set_uses_pending_streams(true);
+
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+  QuicStreamFrame data(stream_id, true, 0, "");
+  session_.OnStreamFrame(data);
+
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   EXPECT_EQ(0u, session_.GetNumOpenIncomingStreams());
 }
@@ -1848,6 +1869,7 @@
       transport_version(), Perspective::IS_CLIENT);
   QuicStreamFrame data1(stream_id, true, 10, QuicStringPiece("HT"));
   session_.OnStreamFrame(data1);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   QuicWindowUpdateFrame window_update_frame(kInvalidControlFrameId, stream_id,
                                             0);
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index e8d6a01..4d460d3 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -58,13 +58,12 @@
       sequencer_(this) {}
 
 void PendingStream::OnDataAvailable() {
-  // It will be called when pending stream receives its first byte. But this
-  // call should simply be ignored so that data remains in sequencer.
+  // Data should be kept in the sequencer so that
+  // QuicSession::ProcessPendingStream() can read it.
 }
 
 void PendingStream::OnFinRead() {
-  QUIC_BUG << "OnFinRead should not be called.";
-  CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected fin read");
+  DCHECK(sequencer_.IsClosed());
 }
 
 void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
@@ -74,6 +73,7 @@
 }
 
 void PendingStream::Reset(QuicRstStreamErrorCode error) {
+  // TODO: RESET_STREAM must not be sent for READ_UNIDIRECTIONAL stream.
   session_->SendRstStream(id_, error, 0);
 }
 
diff --git a/quic/test_tools/quic_session_peer.cc b/quic/test_tools/quic_session_peer.cc
index e78b059..1f83909 100644
--- a/quic/test_tools/quic_session_peer.cc
+++ b/quic/test_tools/quic_session_peer.cc
@@ -243,5 +243,12 @@
   session->SendRstStreamInner(id, error, bytes_written, close_write_side_only);
 }
 
+// static
+PendingStream* QuicSessionPeer::GetPendingStream(QuicSession* session,
+                                                 QuicStreamId stream_id) {
+  auto it = session->pending_stream_map_.find(stream_id);
+  return it == session->pending_stream_map_.end() ? nullptr : it->second.get();
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_session_peer.h b/quic/test_tools/quic_session_peer.h
index f027eb5..decc4b9 100644
--- a/quic/test_tools/quic_session_peer.h
+++ b/quic/test_tools/quic_session_peer.h
@@ -86,6 +86,8 @@
                                  QuicRstStreamErrorCode error,
                                  QuicStreamOffset bytes_written,
                                  bool close_write_side_only);
+  static PendingStream* GetPendingStream(QuicSession* session,
+                                         QuicStreamId stream_id);
 };
 
 }  // namespace test