Add bidirectional support to PendingStream and enable QuicSession to buffer both unidirectional-read/bidirectional streams.

Compared to the unidirectional counterpart:
(1) STREAM frame is processed by the PendingStream.
(2) WINDOW_UPDATE/STOP_SENDING/RST_STREAM are recorded by PendingStream but only processed by QuicStream. E.g., RST_STREAM will not close a bidirectional PendingStream.
(3) BLOCKED_FRAME is informationally and thus is ignored.

PiperOrigin-RevId: 399735001
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 309f92f..e267737 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -12,6 +12,7 @@
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quic/core/frames/quic_ack_frequency_frame.h"
+#include "quic/core/frames/quic_window_update_frame.h"
 #include "quic/core/quic_connection.h"
 #include "quic/core/quic_connection_context.h"
 #include "quic/core/quic_error_codes.h"
@@ -188,21 +189,51 @@
 
 void QuicSession::MaybeProcessPendingStream(PendingStream* pending) {
   QUICHE_DCHECK(pending != nullptr);
-  QuicStream* stream = ProcessPendingStream(pending);
   QuicStreamId stream_id = pending->id();
+  absl::optional<QuicResetStreamError> stop_sending_error_code =
+      pending->GetStopSendingErrorCode();
+  QuicStream* stream = ProcessPendingStream(pending);
   if (stream != nullptr) {
     // The pending stream should now be in the scope of normal streams.
     QUICHE_DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
         << "Stream " << stream_id << " not created";
     pending_stream_map_.erase(stream_id);
+    if (stop_sending_error_code) {
+      stream->OnStopSending(*stop_sending_error_code);
+      if (!connection()->connected()) {
+        return;
+      }
+    }
     stream->OnStreamCreatedFromPendingStream();
     return;
   }
+  // At this point, none of the bytes has been successfully consumed by the
+  // application layer. We should close the pending stream even if it is
+  // bidirectionl as no application will be able to write in a bidirectional
+  // stream with zero byte as input.
   if (pending->sequencer()->IsClosed()) {
     ClosePendingStream(stream_id);
   }
 }
 
+void QuicSession::PendingStreamOnWindowUpdateFrame(
+    const QuicWindowUpdateFrame& frame) {
+  QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
+  PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
+  if (pending) {
+    pending->OnWindowUpdateFrame(frame);
+  }
+}
+
+void QuicSession::PendingStreamOnStopSendingFrame(
+    const QuicStopSendingFrame& frame) {
+  QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
+  PendingStream* pending = GetOrCreatePendingStream(frame.stream_id);
+  if (pending) {
+    pending->OnStopSending(frame.error());
+  }
+}
+
 void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
   QuicStreamId stream_id = frame.stream_id;
   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
@@ -275,6 +306,10 @@
   if (visitor_) {
     visitor_->OnStopSendingReceived(frame);
   }
+  if (ShouldProcessFrameByPendingStream(STOP_SENDING_FRAME, stream_id)) {
+    PendingStreamOnStopSendingFrame(frame);
+    return;
+  }
 
   QuicStream* stream = GetOrCreateStream(stream_id);
   if (!stream) {
@@ -322,11 +357,10 @@
   }
 
   pending->OnRstStreamFrame(frame);
-  // Pending stream is currently read only. We can safely close the stream.
-  QUICHE_DCHECK_EQ(
-      READ_UNIDIRECTIONAL,
-      QuicUtils::GetStreamType(pending->id(), perspective(),
-                               /*peer_initiated = */ true, version()));
+  // At this point, none of the bytes has been consumed by the application
+  // layer. It is safe to close the pending stream even if it is bidirectionl as
+  // no application will be able to write in a bidirectional stream with zero
+  // byte as input.
   ClosePendingStream(stream_id);
 }
 
@@ -504,6 +538,11 @@
     return;
   }
 
+  if (ShouldProcessFrameByPendingStream(WINDOW_UPDATE_FRAME, stream_id)) {
+    PendingStreamOnWindowUpdateFrame(frame);
+    return;
+  }
+
   QuicStream* stream = GetOrCreateStream(stream_id);
   if (stream != nullptr) {
     stream->OnWindowUpdateFrame(frame);
@@ -2565,6 +2604,21 @@
   return connection_->framer().GetEncryptionLevelToSendApplicationData();
 }
 
+void QuicSession::ProcessAllPendingStreams() {
+  std::vector<PendingStream*> pending_streams;
+  pending_streams.reserve(pending_stream_map_.size());
+  for (auto it = pending_stream_map_.cbegin(); it != pending_stream_map_.cend();
+       ++it) {
+    pending_streams.push_back(it->second.get());
+  }
+  for (auto* pending_stream : pending_streams) {
+    MaybeProcessPendingStream(pending_stream);
+    if (!connection()->connected()) {
+      return;
+    }
+  }
+}
+
 void QuicSession::ValidatePath(
     std::unique_ptr<QuicPathValidationContext> context,
     std::unique_ptr<QuicPathValidator::ResultDelegate> result_delegate) {
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index 9e3762a..8789526 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -20,6 +20,8 @@
 #include "absl/types/span.h"
 #include "quic/core/crypto/tls_connection.h"
 #include "quic/core/frames/quic_ack_frequency_frame.h"
+#include "quic/core/frames/quic_stop_sending_frame.h"
+#include "quic/core/frames/quic_window_update_frame.h"
 #include "quic/core/handshaker_delegate_interface.h"
 #include "quic/core/legacy_quic_stream_id_manager.h"
 #include "quic/core/quic_connection.h"
@@ -620,6 +622,9 @@
     return quic_tls_disable_resumption_refactor_;
   }
 
+  // Try converting all pending streams to normal streams.
+  void ProcessAllPendingStreams();
+
  protected:
   using StreamMap =
       absl::flat_hash_map<QuicStreamId, std::unique_ptr<QuicStream>>;
@@ -828,6 +833,7 @@
   // closed.
   QuicStream* GetStream(QuicStreamId id) const;
 
+  // Can return NULL, e.g., if the stream has been closed before.
   PendingStream* GetOrCreatePendingStream(QuicStreamId stream_id);
 
   // Let streams and control frame managers retransmit lost data, returns true
@@ -856,6 +862,14 @@
   // stream.
   void PendingStreamOnRstStream(const QuicRstStreamFrame& frame);
 
+  // Creates or gets pending stream, feeds it with |frame|, and records the
+  // max_data in the pending stream.
+  void PendingStreamOnWindowUpdateFrame(const QuicWindowUpdateFrame& frame);
+
+  // Creates or gets pending stream, feeds it with |frame|, and records the
+  // ietf_error_code in the pending stream.
+  void PendingStreamOnStopSendingFrame(const QuicStopSendingFrame& frame);
+
   // Keep track of highest received byte offset of locally closed streams, while
   // waiting for a definitive final highest offset from the peer.
   absl::flat_hash_map<QuicStreamId, QuicStreamOffset>
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index 19df044..7980256 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -289,6 +289,9 @@
   // test that the session handles pending streams correctly in terms of
   // receiving stream frames.
   QuicStream* ProcessPendingStream(PendingStream* pending) override {
+    if (pending->is_bidirectional()) {
+      return CreateIncomingStream(pending);
+    }
     struct iovec iov;
     if (pending->sequencer()->GetReadableRegion(&iov)) {
       // Create TestStream once the first byte is received.
@@ -369,22 +372,40 @@
     if (!uses_pending_streams_) {
       return false;
     }
+    // Uses pending stream for STREAM/RST_STREAM frames with unidirectional read
+    // stream and uses pending stream for
+    // STREAM/RST_STREAM/STOP_SENDING/WINDOW_UPDATE frames with bidirectional
+    // stream.
+    bool is_incoming_stream = IsIncomingStream(stream_id);
     StreamType stream_type = QuicUtils::GetStreamType(
-        stream_id, perspective(), IsIncomingStream(stream_id), version());
+        stream_id, perspective(), is_incoming_stream, version());
     switch (type) {
       case STREAM_FRAME:
         ABSL_FALLTHROUGH_INTENDED;
       case RST_STREAM_FRAME:
-        return stream_type == READ_UNIDIRECTIONAL;
+        return is_incoming_stream;
+      case STOP_SENDING_FRAME:
+        ABSL_FALLTHROUGH_INTENDED;
+      case WINDOW_UPDATE_FRAME:
+        return stream_type == BIDIRECTIONAL;
       default:
         return false;
     }
   }
 
+  bool ShouldProcessPendingStreamImmediately() const override {
+    return process_pending_stream_immediately_;
+  }
+
   void set_uses_pending_streams(bool uses_pending_streams) {
     uses_pending_streams_ = uses_pending_streams;
   }
 
+  void set_process_pending_stream_immediately(
+      bool process_pending_stream_immediately) {
+    process_pending_stream_immediately_ = process_pending_stream_immediately;
+  }
+
   int num_incoming_streams_created() const {
     return num_incoming_streams_created_;
   }
@@ -401,6 +422,7 @@
 
   bool writev_consumes_all_data_;
   bool uses_pending_streams_;
+  bool process_pending_stream_immediately_ = true;
   QuicFrame save_frame_;
   int num_incoming_streams_created_;
 };
@@ -1795,6 +1817,7 @@
     return;
   }
   session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(true);
 
   QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
       transport_version(), Perspective::IS_CLIENT);
@@ -1809,11 +1832,50 @@
   EXPECT_EQ(1, session_.num_incoming_streams_created());
 }
 
+TEST_P(QuicSessionTestServer, BufferAllIncomingStreams) {
+  if (!VersionUsesHttp3(transport_version())) {
+    return;
+  }
+  session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(false);
+
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+  QuicStreamFrame data1(stream_id, true, 10, absl::string_view("HT"));
+  session_.OnStreamFrame(data1);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+  // Read unidirectional stream is still buffered when the first byte arrives.
+  QuicStreamFrame data2(stream_id, false, 0, absl::string_view("HT"));
+  session_.OnStreamFrame(data2);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  // Bidirectional stream is buffered.
+  QuicStreamId bidirectional_stream_id =
+      QuicUtils::GetFirstBidirectionalStreamId(transport_version(),
+                                               Perspective::IS_CLIENT);
+  QuicStreamFrame data3(bidirectional_stream_id, false, 0,
+                        absl::string_view("HT"));
+  session_.OnStreamFrame(data3);
+  EXPECT_TRUE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  session_.ProcessAllPendingStreams();
+  // Both bidirectional and read-unidirectional streams are unbuffered.
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_FALSE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(2, session_.num_incoming_streams_created());
+}
+
 TEST_P(QuicSessionTestServer, RstPendingStreams) {
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
   session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(false);
 
   QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
       transport_version(), Perspective::IS_CLIENT);
@@ -1835,6 +1897,27 @@
   EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(&session_));
+
+  session_.ProcessAllPendingStreams();
+  // Bidirectional stream is buffered.
+  QuicStreamId bidirectional_stream_id =
+      QuicUtils::GetFirstBidirectionalStreamId(transport_version(),
+                                               Perspective::IS_CLIENT);
+  QuicStreamFrame data3(bidirectional_stream_id, false, 0,
+                        absl::string_view("HT"));
+  session_.OnStreamFrame(data3);
+  EXPECT_TRUE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  // Bidirectional pending stream is removed after RST_STREAM is received.
+  QuicRstStreamFrame rst2(kInvalidControlFrameId, bidirectional_stream_id,
+                          QUIC_ERROR_PROCESSING_STREAM, 12);
+  session_.OnRstStream(rst2);
+  EXPECT_FALSE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+  EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(&session_));
 }
 
 TEST_P(QuicSessionTestServer, OnFinPendingStreams) {
@@ -1842,6 +1925,7 @@
     return;
   }
   session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(true);
 
   QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
       transport_version(), Perspective::IS_CLIENT);
@@ -1851,9 +1935,30 @@
   EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
   EXPECT_EQ(0, session_.num_incoming_streams_created());
   EXPECT_EQ(0u, QuicSessionPeer::GetNumOpenDynamicStreams(&session_));
+
+  session_.set_process_pending_stream_immediately(false);
+  // Bidirectional pending stream remains after Fin is received.
+  // Bidirectional stream is buffered.
+  QuicStreamId bidirectional_stream_id =
+      QuicUtils::GetFirstBidirectionalStreamId(transport_version(),
+                                               Perspective::IS_CLIENT);
+  QuicStreamFrame data2(bidirectional_stream_id, true, 0,
+                        absl::string_view("HT"));
+  session_.OnStreamFrame(data2);
+  EXPECT_TRUE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  session_.ProcessAllPendingStreams();
+  EXPECT_FALSE(
+      QuicSessionPeer::GetPendingStream(&session_, bidirectional_stream_id));
+  EXPECT_EQ(1, session_.num_incoming_streams_created());
+  QuicStream* bidirectional_stream =
+      QuicSessionPeer::GetStream(&session_, bidirectional_stream_id);
+  EXPECT_TRUE(bidirectional_stream->fin_received());
 }
 
-TEST_P(QuicSessionTestServer, PendingStreamOnWindowUpdate) {
+TEST_P(QuicSessionTestServer, UnidirectionalPendingStreamOnWindowUpdate) {
   if (!VersionUsesHttp3(transport_version())) {
     return;
   }
@@ -1875,6 +1980,80 @@
   session_.OnWindowUpdateFrame(window_update_frame);
 }
 
+TEST_P(QuicSessionTestServer, BidirectionalPendingStreamOnWindowUpdate) {
+  if (!VersionUsesHttp3(transport_version())) {
+    return;
+  }
+
+  session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(false);
+  QuicStreamId stream_id = QuicUtils::GetFirstBidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+  QuicStreamFrame data(stream_id, true, 10, absl::string_view("HT"));
+  session_.OnStreamFrame(data);
+  QuicWindowUpdateFrame window_update_frame(kInvalidControlFrameId, stream_id,
+                                            kDefaultFlowControlSendWindow * 2);
+  session_.OnWindowUpdateFrame(window_update_frame);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  session_.ProcessAllPendingStreams();
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(1, session_.num_incoming_streams_created());
+  QuicStream* bidirectional_stream =
+      QuicSessionPeer::GetStream(&session_, stream_id);
+  QuicByteCount send_window =
+      QuicStreamPeer::SendWindowSize(bidirectional_stream);
+  EXPECT_EQ(send_window, kDefaultFlowControlSendWindow * 2);
+}
+
+TEST_P(QuicSessionTestServer, UnidirectionalPendingStreamOnStopSending) {
+  if (!VersionUsesHttp3(transport_version())) {
+    return;
+  }
+
+  session_.set_uses_pending_streams(true);
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+  QuicStreamFrame data1(stream_id, true, 10, absl::string_view("HT"));
+  session_.OnStreamFrame(data1);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+  QuicStopSendingFrame stop_sending_frame(kInvalidControlFrameId, stream_id,
+                                          QUIC_STREAM_CANCELLED);
+  EXPECT_CALL(
+      *connection_,
+      CloseConnection(QUIC_INVALID_STREAM_ID,
+                      "Received STOP_SENDING for a read-only stream", _));
+  session_.OnStopSendingFrame(stop_sending_frame);
+}
+
+TEST_P(QuicSessionTestServer, BidirectionalPendingStreamOnStopSending) {
+  if (!VersionUsesHttp3(transport_version())) {
+    return;
+  }
+
+  session_.set_uses_pending_streams(true);
+  session_.set_process_pending_stream_immediately(false);
+  QuicStreamId stream_id = QuicUtils::GetFirstBidirectionalStreamId(
+      transport_version(), Perspective::IS_CLIENT);
+  QuicStreamFrame data(stream_id, true, 0, absl::string_view("HT"));
+  session_.OnStreamFrame(data);
+  QuicStopSendingFrame stop_sending_frame(kInvalidControlFrameId, stream_id,
+                                          QUIC_STREAM_CANCELLED);
+  session_.OnStopSendingFrame(stop_sending_frame);
+  EXPECT_TRUE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(0, session_.num_incoming_streams_created());
+
+  EXPECT_CALL(*connection_, OnStreamReset(stream_id, _));
+  session_.ProcessAllPendingStreams();
+  EXPECT_FALSE(QuicSessionPeer::GetPendingStream(&session_, stream_id));
+  EXPECT_EQ(1, session_.num_incoming_streams_created());
+  QuicStream* bidirectional_stream =
+      QuicSessionPeer::GetStream(&session_, stream_id);
+  EXPECT_TRUE(bidirectional_stream->write_side_closed());
+}
+
 TEST_P(QuicSessionTestServer, DrainingStreamsDoNotCountAsOpened) {
   // Verify that a draining stream (which has received a FIN but not consumed
   // it) does not count against the open quota (because it is closed from the
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 08c0358..74a5d45 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -15,6 +15,7 @@
 #include "quic/core/quic_session.h"
 #include "quic/core/quic_types.h"
 #include "quic/core/quic_utils.h"
+#include "quic/core/quic_versions.h"
 #include "quic/platform/api/quic_bug_tracker.h"
 #include "quic/platform/api/quic_flag_utils.h"
 #include "quic/platform/api/quic_flags.h"
@@ -119,6 +120,10 @@
       stream_delegate_(session),
       stream_bytes_read_(0),
       fin_received_(false),
+      is_bidirectional_(QuicUtils::GetStreamType(id, session->perspective(),
+                                                 /*peer_initiated = */ true,
+                                                 session->version()) ==
+                        BIDIRECTIONAL),
       connection_flow_controller_(session->flow_controller()),
       flow_controller_(session, id,
                        /*is_connection_flow_controller*/ false,
@@ -246,6 +251,11 @@
   }
 }
 
+void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
+  QUICHE_DCHECK(is_bidirectional_);
+  flow_controller_.UpdateSendWindowOffset(frame.max_data);
+}
+
 bool PendingStream::MaybeIncreaseHighestReceivedOffset(
     QuicStreamOffset new_offset) {
   uint64_t increment =
@@ -262,6 +272,13 @@
   return true;
 }
 
+void PendingStream::OnStopSending(
+    QuicResetStreamError stop_sending_error_code) {
+  if (!stop_sending_error_code_) {
+    stop_sending_error_code_ = stop_sending_error_code;
+  }
+}
+
 void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
   sequencer_.MarkConsumed(num_bytes);
 }
@@ -282,9 +299,6 @@
                  std::move(pending->flow_controller_),
                  pending->connection_flow_controller_) {
   QUICHE_DCHECK(session->version().HasIetfQuicFrames());
-  // TODO(haoyuewang) Remove this check once bidirectional pending stream is
-  // supported.
-  QUICHE_DCHECK(type_ == READ_UNIDIRECTIONAL);
   sequencer_.set_stream(this);
 }
 
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index fcc7fe5..f2485b9 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -25,6 +25,7 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
 #include "absl/types/span.h"
+#include "quic/core/frames/quic_rst_stream_frame.h"
 #include "quic/core/quic_error_codes.h"
 #include "quic/core/quic_flow_controller.h"
 #include "quic/core/quic_packets.h"
@@ -73,10 +74,21 @@
   // If the data violates flow control, the connection will be closed.
   void OnStreamFrame(const QuicStreamFrame& frame);
 
+  bool is_bidirectional() const { return is_bidirectional_; }
+
   // Stores the final byte offset from |frame|.
   // If the final offset violates flow control, the connection will be closed.
   void OnRstStreamFrame(const QuicRstStreamFrame& frame);
 
+  void OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame);
+
+  void OnStopSending(QuicResetStreamError stop_sending_error_code);
+
+  // The error code received from QuicStopSendingFrame (if any).
+  const absl::optional<QuicResetStreamError>& GetStopSendingErrorCode() const {
+    return stop_sending_error_code_;
+  }
+
   // Returns the number of bytes read on this stream.
   uint64_t stream_bytes_read() { return stream_bytes_read_; }
 
@@ -109,12 +121,17 @@
   // True if a frame containing a fin has been received.
   bool fin_received_;
 
+  // True if this pending stream is backing a bidirectional stream.
+  bool is_bidirectional_;
+
   // Connection-level flow controller. Owned by the session.
   QuicFlowController* connection_flow_controller_;
   // Stream-level flow controller.
   QuicFlowController flow_controller_;
   // Stores the buffered frames.
   QuicStreamSequencer sequencer_;
+  // The error code received from QuicStopSendingFrame (if any).
+  absl::optional<QuicResetStreamError> stop_sending_error_code_;
 };
 
 class QUIC_EXPORT_PRIVATE QuicStream
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc
index 5d3b84f..9d48b1f 100644
--- a/quic/core/quic_stream_test.cc
+++ b/quic/core/quic_stream_test.cc
@@ -226,18 +226,32 @@
 TEST_P(PendingStreamTest, PendingStreamTooMuchDataInRstStream) {
   Initialize();
 
-  PendingStream pending(kTestPendingStreamId, session_.get());
+  PendingStream pending1(kTestPendingStreamId, session_.get());
   // Receive a rst stream frame that violates flow control: the byte offset is
   // higher than the receive window offset.
-  QuicRstStreamFrame frame(kInvalidControlFrameId, kTestPendingStreamId,
-                           QUIC_STREAM_CANCELLED,
-                           kInitialSessionFlowControlWindowForTest + 1);
+  QuicRstStreamFrame frame1(kInvalidControlFrameId, kTestPendingStreamId,
+                            QUIC_STREAM_CANCELLED,
+                            kInitialSessionFlowControlWindowForTest + 1);
 
   // Pending stream should not accept the frame, and the connection should be
   // closed.
   EXPECT_CALL(*connection_,
               CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
-  pending.OnRstStreamFrame(frame);
+  pending1.OnRstStreamFrame(frame1);
+
+  QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
+      session_->transport_version(), Perspective::IS_CLIENT);
+  PendingStream pending2(bidirection_stream_id, session_.get());
+  // Receive a rst stream frame that violates flow control: the byte offset is
+  // higher than the receive window offset.
+  QuicRstStreamFrame frame2(kInvalidControlFrameId, bidirection_stream_id,
+                            QUIC_STREAM_CANCELLED,
+                            kInitialSessionFlowControlWindowForTest + 1);
+  // Bidirectional Pending stream should not accept the frame, and the
+  // connection should be closed.
+  EXPECT_CALL(*connection_,
+              CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
+  pending2.OnRstStreamFrame(frame2);
 }
 
 TEST_P(PendingStreamTest, PendingStreamRstStream) {
@@ -253,6 +267,35 @@
   pending.OnRstStreamFrame(frame);
 }
 
+TEST_P(PendingStreamTest, PendingStreamWindowUpdate) {
+  Initialize();
+
+  QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
+      session_->transport_version(), Perspective::IS_CLIENT);
+  PendingStream pending(bidirection_stream_id, session_.get());
+  QuicWindowUpdateFrame frame(kInvalidControlFrameId, bidirection_stream_id,
+                              kDefaultFlowControlSendWindow * 2);
+  pending.OnWindowUpdateFrame(frame);
+  TestStream stream(&pending, session_.get(), false);
+
+  EXPECT_EQ(QuicStreamPeer::SendWindowSize(&stream),
+            kDefaultFlowControlSendWindow * 2);
+}
+
+TEST_P(PendingStreamTest, PendingStreamStopSending) {
+  Initialize();
+
+  QuicStreamId bidirection_stream_id = QuicUtils::GetFirstBidirectionalStreamId(
+      session_->transport_version(), Perspective::IS_CLIENT);
+  PendingStream pending(bidirection_stream_id, session_.get());
+  QuicResetStreamError error =
+      QuicResetStreamError::FromInternal(QUIC_STREAM_INTERNAL_ERROR);
+  pending.OnStopSending(error);
+  EXPECT_TRUE(pending.GetStopSendingErrorCode());
+  auto actual_error = *pending.GetStopSendingErrorCode();
+  EXPECT_EQ(actual_error, error);
+}
+
 TEST_P(PendingStreamTest, FromPendingStream) {
   Initialize();