gfe-relnote: In QUIC, break QuicStream::CloseRWSide -> QuicSession::CloseStream -> QuicStream::OnClose -> CloseRWSide loop. Protected by gfe2_reloadable_flag_quic_break_session_stream_close_loop.

With this change, a stream can be closed by 4 methods:
1) QuicSession::CloseStream (will soon be removed)
2) QuicSession::ResetStream
3) QuicStream::Reset
4) QuicStream::OnStreamReset
And also make the code path consistent. All -> QuicStream::CloseReadSide/CloseWriteSide -> QuicSession::OnStreamClosed.

PiperOrigin-RevId: 307032801
Change-Id: I1a2f49ddb94cc9642ce8c8fcb514f5adb928d045
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc
index a4d6cd7..4d02c80 100644
--- a/quic/core/http/end_to_end_test.cc
+++ b/quic/core/http/end_to_end_test.cc
@@ -2027,7 +2027,11 @@
   // Transmit the cancel, and ensure the connection is torn down properly.
   SetPacketLossPercentage(0);
   QuicStreamId stream_id = GetNthClientInitiatedBidirectionalId(0);
-  session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, 0);
+  if (session->break_close_loop()) {
+    session->ResetStream(stream_id, QUIC_STREAM_CANCELLED, 0);
+  } else {
+    session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, 0);
+  }
 
   // WaitForEvents waits 50ms and returns true if there are outstanding
   // requests.
diff --git a/quic/core/http/quic_client_promised_info_test.cc b/quic/core/http/quic_client_promised_info_test.cc
index 27d9fe8..381160b 100644
--- a/quic/core/http/quic_client_promised_info_test.cc
+++ b/quic/core/http/quic_client_promised_info_test.cc
@@ -227,7 +227,9 @@
   EXPECT_CALL(*connection_, SendControlFrame(_));
   EXPECT_CALL(*connection_,
               OnStreamReset(promise_id_, QUIC_PROMISE_VARY_MISMATCH));
-  EXPECT_CALL(session_, CloseStream(promise_id_));
+  if (!session_.break_close_loop()) {
+    EXPECT_CALL(session_, CloseStream(promise_id_));
+  }
 
   promised->HandleClientRequest(client_request_, &delegate);
 }
@@ -303,7 +305,9 @@
   session_.GetOrCreateStream(promise_id_);
 
   // Cancel the promised stream.
-  EXPECT_CALL(session_, CloseStream(promise_id_));
+  if (!session_.break_close_loop()) {
+    EXPECT_CALL(session_, CloseStream(promise_id_));
+  }
   EXPECT_CALL(*connection_, SendControlFrame(_));
   EXPECT_CALL(*connection_, OnStreamReset(promise_id_, QUIC_STREAM_CANCELLED));
   promised->Cancel();
@@ -327,11 +331,17 @@
   promise_stream->OnStreamHeaderList(false, headers.uncompressed_header_bytes(),
                                      headers);
 
-  EXPECT_CALL(session_, CloseStream(promise_id_));
+  if (!session_.break_close_loop()) {
+    EXPECT_CALL(session_, CloseStream(promise_id_));
+  }
   EXPECT_CALL(*connection_, SendControlFrame(_));
   EXPECT_CALL(*connection_,
               OnStreamReset(promise_id_, QUIC_STREAM_PEER_GOING_AWAY));
-  session_.SendRstStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  if (session_.break_close_loop()) {
+    session_.ResetStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  } else {
+    session_.SendRstStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  }
 
   // Now initiate rendezvous.
   TestPushPromiseDelegate delegate(/*match=*/true);
diff --git a/quic/core/http/quic_spdy_client_session_base.cc b/quic/core/http/quic_spdy_client_session_base.cc
index 2655259..63de57c 100644
--- a/quic/core/http/quic_spdy_client_session_base.cc
+++ b/quic/core/http/quic_spdy_client_session_base.cc
@@ -204,7 +204,11 @@
     QuicStreamId id,
     QuicRstStreamErrorCode error_code) {
   DCHECK(QuicUtils::IsServerInitiatedStreamId(transport_version(), id));
-  SendRstStream(id, error_code, 0);
+  if (break_close_loop()) {
+    ResetStream(id, error_code, 0);
+  } else {
+    SendRstStream(id, error_code, 0);
+  }
   if (!IsOpenStream(id) && !IsClosedStream(id)) {
     MaybeIncreaseLargestPeerStreamId(id);
   }
@@ -218,6 +222,14 @@
   }
 }
 
+void QuicSpdyClientSessionBase::OnStreamClosed(QuicStreamId stream_id) {
+  DCHECK(break_close_loop());
+  QuicSpdySession::OnStreamClosed(stream_id);
+  if (!VersionUsesHttp3(transport_version())) {
+    headers_stream()->MaybeReleaseSequencerBuffer();
+  }
+}
+
 bool QuicSpdyClientSessionBase::ShouldReleaseHeadersStreamSequencerBuffer() {
   return !HasActiveRequestStreams() && promised_by_id_.empty();
 }
diff --git a/quic/core/http/quic_spdy_client_session_base.h b/quic/core/http/quic_spdy_client_session_base.h
index 9d702e3..282a8cf 100644
--- a/quic/core/http/quic_spdy_client_session_base.h
+++ b/quic/core/http/quic_spdy_client_session_base.h
@@ -105,6 +105,9 @@
   // Release headers stream's sequencer buffer if it's empty.
   void CloseStreamInner(QuicStreamId stream_id, bool rst_sent) override;
 
+  // Release headers stream's sequencer buffer if it's empty.
+  void OnStreamClosed(QuicStreamId stream_id) override;
+
   // Returns true if there are no active requests and no promised streams.
   bool ShouldReleaseHeadersStreamSequencerBuffer() override;
 
diff --git a/quic/core/http/quic_spdy_client_session_test.cc b/quic/core/http/quic_spdy_client_session_test.cc
index 65dd43a..dc72a73 100644
--- a/quic/core/http/quic_spdy_client_session_test.cc
+++ b/quic/core/http/quic_spdy_client_session_test.cc
@@ -354,7 +354,11 @@
       .Times(AtLeast(1))
       .WillRepeatedly(Invoke(&ClearControlFrame));
   EXPECT_CALL(*connection_, OnStreamReset(_, _)).Times(1);
-  session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  if (session_->break_close_loop()) {
+    session_->ResetStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  } else {
+    session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  }
 
   // A new stream cannot be created as the reset stream still counts as an open
   // outgoing stream until closed by the server.
@@ -406,7 +410,11 @@
       .Times(AtLeast(1))
       .WillRepeatedly(Invoke(&ClearControlFrame));
   EXPECT_CALL(*connection_, OnStreamReset(_, _)).Times(1);
-  session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  if (session_->break_close_loop()) {
+    session_->ResetStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  } else {
+    session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  }
 
   // The stream receives trailers with final byte offset, but the header value
   // is non-numeric and should be treated as malformed.
@@ -861,7 +869,12 @@
   EXPECT_CALL(*connection_, SendControlFrame(_));
   EXPECT_CALL(*connection_,
               OnStreamReset(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY));
-  session_->SendRstStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  if (session_->break_close_loop()) {
+    session_->ResetStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY, 0);
+  } else {
+    session_->SendRstStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY,
+                            0);
+  }
   QuicClientPromisedInfo* promised =
       session_->GetPromisedById(promised_stream_id_);
   EXPECT_NE(promised, nullptr);
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index f46d914..9db9f22 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -317,6 +317,7 @@
         &helper_, &alarm_factory_, perspective, SupportedVersions(GetParam()));
     session_ = std::make_unique<StrictMock<TestSession>>(connection_);
     session_->Initialize();
+    connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1));
     ON_CALL(*session_, WritevData(_, _, _, _, _, _))
         .WillByDefault(
             Invoke(session_.get(), &MockQuicSpdySession::ConsumeData));
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 4124575..3c6d323 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -124,7 +124,9 @@
       write_with_transmission_(
           GetQuicReloadableFlag(quic_write_with_transmission)),
       deprecate_draining_streams_(
-          GetQuicReloadableFlag(quic_deprecate_draining_streams)) {
+          GetQuicReloadableFlag(quic_deprecate_draining_streams)),
+      break_close_loop_(
+          GetQuicReloadableFlag(quic_break_session_stream_close_loop)) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -132,6 +134,9 @@
       connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
   }
+  if (break_close_loop_) {
+    QUIC_RELOADABLE_FLAG_COUNT(quic_break_session_stream_close_loop);
+  }
 }
 
 void QuicSession::Initialize() {
@@ -779,6 +784,10 @@
     connection_->OnStreamReset(id, error);
   }
 
+  if (break_close_loop_) {
+    return;
+  }
+
   if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
     OnStreamDoneWaitingForAcks(id);
     return;
@@ -786,6 +795,26 @@
   CloseStreamInner(id, true);
 }
 
+void QuicSession::ResetStream(QuicStreamId id,
+                              QuicRstStreamErrorCode error,
+                              QuicStreamOffset bytes_written) {
+  DCHECK(break_close_loop_);
+  QuicStream* stream = GetStream(id);
+  if (stream != nullptr && stream->is_static()) {
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
+
+  if (stream != nullptr) {
+    stream->Reset(error);
+    return;
+  }
+
+  SendRstStream(id, error, bytes_written);
+}
+
 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
                                           QuicRstStreamErrorCode error,
                                           QuicStreamOffset bytes_written) {
@@ -878,6 +907,11 @@
         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
     return;
   }
+  if (break_close_loop_) {
+    stream->CloseReadSide();
+    stream->CloseWriteSide();
+    return;
+  }
   StreamType type = stream->type();
 
   // Tell the stream that a RST has been sent.
@@ -949,6 +983,79 @@
   }
 }
 
+void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
+  QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
+  DCHECK(break_close_loop_);
+  StreamMap::iterator it = stream_map_.find(stream_id);
+  if (it == stream_map_.end()) {
+    QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
+    return;
+  }
+  QuicStream* stream = it->second.get();
+  StreamType type = stream->type();
+
+  if (stream->IsWaitingForAcks()) {
+    zombie_streams_[stream->id()] = std::move(it->second);
+  } else {
+    // Clean up the stream since it is no longer waiting for acks.
+    streams_waiting_for_acks_.erase(stream->id());
+    closed_streams_.push_back(std::move(it->second));
+    // Do not retransmit data of a closed stream.
+    streams_with_pending_retransmission_.erase(stream_id);
+    if (!closed_streams_clean_up_alarm_->IsSet()) {
+      closed_streams_clean_up_alarm_->Set(
+          connection_->clock()->ApproximateNow());
+    }
+  }
+
+  // If we haven't received a FIN or RST for this stream, we need to keep track
+  // of the how many bytes the stream's flow controller believes it has
+  // received, for accurate connection level flow control accounting.
+  const bool had_fin_or_rst = stream->HasReceivedFinalOffset();
+  if (!had_fin_or_rst) {
+    InsertLocallyClosedStreamsHighestOffset(
+        stream_id, stream->flow_controller()->highest_received_byte_offset());
+  }
+  bool stream_was_draining = false;
+  if (deprecate_draining_streams_) {
+    stream_was_draining = stream->was_draining();
+    QUIC_DVLOG_IF(1, stream_was_draining)
+        << ENDPOINT << "Stream " << stream_id << " was draining";
+  }
+  stream_map_.erase(it);
+  if (IsIncomingStream(stream_id)) {
+    --num_dynamic_incoming_streams_;
+  }
+  if (!deprecate_draining_streams_) {
+    stream_was_draining =
+        draining_streams_.find(stream_id) != draining_streams_.end();
+  }
+  if (stream_was_draining) {
+    if (IsIncomingStream(stream_id)) {
+      QUIC_BUG_IF(num_draining_incoming_streams_ == 0);
+      --num_draining_incoming_streams_;
+    } else if (deprecate_draining_streams_) {
+      QUIC_BUG_IF(num_draining_outgoing_streams_ == 0);
+      --num_draining_outgoing_streams_;
+    }
+    draining_streams_.erase(stream_id);
+  } else if (VersionHasIetfQuicFrames(transport_version())) {
+    // Stream was not draining, but we did have a fin or rst, so we can now
+    // free the stream ID if version 99.
+    if (had_fin_or_rst && connection_->connected()) {
+      // Do not bother informing stream ID manager if connection is closed.
+      v99_streamid_manager_.OnStreamClosed(stream_id);
+    }
+  }
+
+  if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
+      !VersionHasIetfQuicFrames(transport_version())) {
+    // Streams that first became draining already called OnCanCreate...
+    // This covers the case where the stream went directly to being closed.
+    OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
+  }
+}
+
 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
 
@@ -1599,7 +1706,11 @@
     if (!stream_id_manager_.CanOpenIncomingStream(
             GetNumOpenIncomingStreams())) {
       // Refuse to open the stream.
-      SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      if (break_close_loop_) {
+        ResetStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      } else {
+        SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      }
       return nullptr;
     }
   }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h
index d1720a0..f67c097 100644
--- a/quic/core/quic_session.h
+++ b/quic/core/quic_session.h
@@ -198,13 +198,17 @@
   // will be sent in specified transmission |type|.
   bool WriteControlFrame(const QuicFrame& frame, TransmissionType type);
 
-  // Close the stream in both directions.
-  // TODO(renjietang): rename this method as it sends both RST_STREAM and
-  // STOP_SENDING in IETF QUIC.
+  // Called by stream to send RST_STREAM (and STOP_SENDING).
   virtual void SendRstStream(QuicStreamId id,
                              QuicRstStreamErrorCode error,
                              QuicStreamOffset bytes_written);
 
+  // Called to send RST_STREAM (and STOP_SENDING) and close stream. If stream
+  // |id| does not exist, just send RST_STREAM (and STOP_SENDING).
+  virtual void ResetStream(QuicStreamId id,
+                           QuicRstStreamErrorCode error,
+                           QuicStreamOffset bytes_written);
+
   // Called when the session wants to go away and not accept any new streams.
   virtual void SendGoAway(QuicErrorCode error_code, const std::string& reason);
 
@@ -217,9 +221,15 @@
   // Create and transmit a STOP_SENDING frame
   virtual void SendStopSending(uint16_t code, QuicStreamId stream_id);
 
-  // Removes the stream associated with 'stream_id' from the active stream map.
+  // Close stream |stream_id|. Whether sending RST_STREAM (and STOP_SENDING)
+  // depends on the sending and receiving states.
+  // TODO(fayang): Deprecate CloseStream, instead always use ResetStream to
+  // close a stream from session.
   virtual void CloseStream(QuicStreamId stream_id);
 
+  // Called by stream |stream_id| when it gets closed.
+  virtual void OnStreamClosed(QuicStreamId stream_id);
+
   // Returns true if outgoing packets will be encrypted, even if the server
   // hasn't confirmed the handshake yet.
   virtual bool IsEncryptionEstablished() const;
@@ -487,6 +497,8 @@
     return deprecate_draining_streams_;
   }
 
+  bool break_close_loop() const { return break_close_loop_; }
+
  protected:
   using StreamMap = QuicSmallMap<QuicStreamId, std::unique_ptr<QuicStream>, 10>;
 
@@ -535,6 +547,7 @@
 
   // Performs the work required to close |stream_id|.  If |rst_sent| then a
   // Reset Stream frame has already been sent for this stream.
+  // TODO(fayang): Remove CloseStreamInner.
   virtual void CloseStreamInner(QuicStreamId stream_id, bool rst_sent);
 
   // When a stream is closed locally, it may not yet know how many bytes the
@@ -833,6 +846,9 @@
 
   // Latched value of quic_deprecate_draining_streams.
   const bool deprecate_draining_streams_;
+
+  // Latched value of quic_break_session_stream_close_loop.
+  const bool break_close_loop_;
 };
 
 }  // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc
index bf4bffc..80ef0d5 100644
--- a/quic/core/quic_session_test.cc
+++ b/quic/core/quic_session_test.cc
@@ -2392,8 +2392,14 @@
   session_.OnFrameLost(QuicFrame(frame));
   // Retransmit stream data causes connection close. Stream has not sent fin
   // yet, so an RST is sent.
-  EXPECT_CALL(*stream, OnCanWrite())
-      .WillOnce(Invoke(stream, &QuicStream::OnClose));
+  if (session_.break_close_loop()) {
+    EXPECT_CALL(*stream, OnCanWrite()).WillOnce(Invoke([this, stream]() {
+      session_.CloseStream(stream->id());
+    }));
+  } else {
+    EXPECT_CALL(*stream, OnCanWrite())
+        .WillOnce(Invoke(stream, &QuicStream::OnClose));
+  }
   if (VersionHasIetfQuicFrames(transport_version())) {
     // Once for the RST_STREAM, once for the STOP_SENDING
     EXPECT_CALL(*connection_, SendControlFrame(_))
@@ -2751,6 +2757,7 @@
 
   TestStream* stream = session_.CreateOutgoingBidirectionalStream();
   QuicStreamId stream_id = stream->id();
+  QuicStreamPeer::SetFinSent(stream);
   stream->CloseWriteSide();
   EXPECT_TRUE(stream->write_side_closed());
   QuicStopSendingFrame frame(1, stream_id, 123);
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc
index 813ea2d..75cb9f9 100644
--- a/quic/core/quic_stream.cc
+++ b/quic/core/quic_stream.cc
@@ -573,9 +573,16 @@
 
 void QuicStream::Reset(QuicRstStreamErrorCode error) {
   stream_error_ = error;
-  // Sending a RstStream results in calling CloseStream.
   session()->SendRstStream(id(), error, stream_bytes_written());
   rst_sent_ = true;
+  if (session_->break_close_loop()) {
+    if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+      session()->OnStreamDoneWaitingForAcks(id_);
+      return;
+    }
+    CloseReadSide();
+    CloseWriteSide();
+  }
 }
 
 void QuicStream::OnUnrecoverableError(QuicErrorCode error,
@@ -762,7 +769,12 @@
 
   if (write_side_closed_) {
     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
-    session_->CloseStream(id());
+    if (session_->break_close_loop()) {
+      session_->OnStreamClosed(id());
+      OnClose();
+    } else {
+      session_->CloseStream(id());
+    }
   }
 }
 
@@ -775,7 +787,12 @@
   write_side_closed_ = true;
   if (read_side_closed_) {
     QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
-    session_->CloseStream(id());
+    if (session_->break_close_loop()) {
+      session_->OnStreamClosed(id());
+      OnClose();
+    } else {
+      session_->CloseStream(id());
+    }
   }
 }
 
@@ -798,8 +815,12 @@
 }
 
 void QuicStream::OnClose() {
-  CloseReadSide();
-  CloseWriteSide();
+  if (session()->break_close_loop()) {
+    DCHECK(read_side_closed_ && write_side_closed_);
+  } else {
+    CloseReadSide();
+    CloseWriteSide();
+  }
 
   if (!fin_sent_ && !rst_sent_) {
     // For flow control accounting, tell the peer how many bytes have been
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h
index b8e940f..90dac69 100644
--- a/quic/core/quic_stream.h
+++ b/quic/core/quic_stream.h
@@ -165,10 +165,12 @@
   // stream to write any pending data.
   virtual void OnCanWrite();
 
-  // Called by the session just before the object is destroyed.
+  // Called just before the object is destroyed.
   // The object should not be accessed after OnClose is called.
   // Sends a RST_STREAM with code QUIC_RST_ACKNOWLEDGEMENT if neither a FIN nor
   // a RST_STREAM has been sent.
+  // TODO(fayang): move this to protected when deprecating
+  // quic_break_session_stream_close_loop.
   virtual void OnClose();
 
   // Called by the session when the endpoint receives a RST_STREAM from the
@@ -355,6 +357,14 @@
   // Does not send a FIN.  May cause the stream to be closed.
   virtual void CloseWriteSide();
 
+  // Close the read side of the stream.  May cause the stream to be closed.
+  // Subclasses and consumers should use StopReading to terminate reading early
+  // if expecting a FIN. Can be used directly by subclasses if not expecting a
+  // FIN.
+  // TODO(fayang): move this to protected when removing
+  // QuicSession::CloseStream.
+  void CloseReadSide();
+
   // Returns true if the stream is static.
   bool is_static() const { return is_static_; }
 
@@ -364,12 +374,6 @@
       const QuicSession* session);
 
  protected:
-  // Close the read side of the socket.  May cause the stream to be closed.
-  // Subclasses and consumers should use StopReading to terminate reading early
-  // if expecting a FIN. Can be used directly by subclasses if not expecting a
-  // FIN.
-  void CloseReadSide();
-
   // Called when data of [offset, offset + data_length] is buffered in send
   // buffer.
   virtual void OnDataBuffered(
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc
index 100c05e..ed8e0e4 100644
--- a/quic/core/quic_stream_test.cc
+++ b/quic/core/quic_stream_test.cc
@@ -455,7 +455,12 @@
 
   // Now close the stream, and expect that we send a RST.
   EXPECT_CALL(*session_, SendRstStream(_, _, _));
-  stream_->OnClose();
+  if (session_->break_close_loop()) {
+    stream_->CloseReadSide();
+    stream_->CloseWriteSide();
+  } else {
+    stream_->OnClose();
+  }
   EXPECT_FALSE(session_->HasUnackedStreamData());
   EXPECT_FALSE(fin_sent());
   EXPECT_TRUE(rst_sent());
@@ -482,7 +487,12 @@
   EXPECT_FALSE(rst_sent());
 
   // Now close the stream, and expect that we do not send a RST.
-  stream_->OnClose();
+  if (session_->break_close_loop()) {
+    stream_->CloseReadSide();
+    stream_->CloseWriteSide();
+  } else {
+    stream_->OnClose();
+  }
   EXPECT_TRUE(fin_sent());
   EXPECT_FALSE(rst_sent());
 }
@@ -506,7 +516,12 @@
 
   // Now close the stream (any further resets being sent would break the
   // expectation above).
-  stream_->OnClose();
+  if (session_->break_close_loop()) {
+    stream_->CloseReadSide();
+    stream_->CloseWriteSide();
+  } else {
+    stream_->OnClose();
+  }
   EXPECT_FALSE(fin_sent());
   EXPECT_TRUE(rst_sent());
 }
@@ -642,7 +657,12 @@
               CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _));
   stream_->OnStreamReset(rst_frame);
   EXPECT_TRUE(stream_->HasReceivedFinalOffset());
-  stream_->OnClose();
+  if (session_->break_close_loop()) {
+    stream_->CloseReadSide();
+    stream_->CloseWriteSide();
+  } else {
+    stream_->OnClose();
+  }
 }
 
 TEST_P(QuicStreamTest, FinalByteOffsetFromZeroLengthStreamFrame) {
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc
index a93f410..eab7512 100644
--- a/quic/quartc/quartc_session.cc
+++ b/quic/quartc/quartc_session.cc
@@ -198,11 +198,11 @@
 }
 
 void QuartcSession::CancelStream(QuicStreamId stream_id) {
-  ResetStream(stream_id, QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
+  ResetQuartcStream(stream_id, QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED);
 }
 
-void QuartcSession::ResetStream(QuicStreamId stream_id,
-                                QuicRstStreamErrorCode error) {
+void QuartcSession::ResetQuartcStream(QuicStreamId stream_id,
+                                      QuicRstStreamErrorCode error) {
   if (!IsOpenStream(stream_id)) {
     return;
   }
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h
index db988a0..e7dd853 100644
--- a/quic/quartc/quartc_session.h
+++ b/quic/quartc/quartc_session.h
@@ -204,7 +204,7 @@
   // returns an unowned pointer to the stream for convenience.
   QuartcStream* ActivateDataStream(std::unique_ptr<QuartcStream> stream);
 
-  void ResetStream(QuicStreamId stream_id, QuicRstStreamErrorCode error);
+  void ResetQuartcStream(QuicStreamId stream_id, QuicRstStreamErrorCode error);
 
   const QuicClock* clock() { return clock_; }
 
diff --git a/quic/quartc/quartc_stream_test.cc b/quic/quartc/quartc_stream_test.cc
index 7a8b22d..b387e7f 100644
--- a/quic/quartc/quartc_stream_test.cc
+++ b/quic/quartc/quartc_stream_test.cc
@@ -398,7 +398,12 @@
 TEST_P(QuartcStreamTest, CloseStream) {
   CreateReliableQuicStream();
   EXPECT_FALSE(mock_stream_delegate_->closed());
-  stream_->OnClose();
+  if (GetQuicReloadableFlag(quic_break_session_stream_close_loop)) {
+    stream_->CloseWriteSide();
+    stream_->CloseReadSide();
+  } else {
+    stream_->OnClose();
+  }
   EXPECT_TRUE(mock_stream_delegate_->closed());
 }
 
diff --git a/quic/test_tools/quic_test_client.cc b/quic/test_tools/quic_test_client.cc
index 5ddc879..c274470 100644
--- a/quic/test_tools/quic_test_client.cc
+++ b/quic/test_tools/quic_test_client.cc
@@ -393,8 +393,13 @@
   QuicStreamId stream_id = GetNthClientInitiatedBidirectionalStreamId(
       session->transport_version(), 0);
   QuicStream* stream = session->GetOrCreateStream(stream_id);
-  session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED,
+  if (session->break_close_loop()) {
+    session->ResetStream(stream_id, QUIC_STREAM_CANCELLED,
                          stream->stream_bytes_written());
+  } else {
+    session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED,
+                           stream->stream_bytes_written());
+  }
   return ret;
 }