gfe-relnote: In QUIC, break QuicStream::CloseRWSide -> QuicSession::CloseStream -> QuicStream::OnClose -> CloseRWSide loop. Protected by gfe2_reloadable_flag_quic_break_session_stream_close_loop.

With this change, a stream can be closed by 4 methods:
1) QuicSession::CloseStream (will soon be removed)
2) QuicSession::ResetStream
3) QuicStream::Reset
4) QuicStream::OnStreamReset
And also make the code path consistent. All -> QuicStream::CloseReadSide/CloseWriteSide -> QuicSession::OnStreamClosed.

PiperOrigin-RevId: 307032801
Change-Id: I1a2f49ddb94cc9642ce8c8fcb514f5adb928d045
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 4124575..3c6d323 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -124,7 +124,9 @@
       write_with_transmission_(
           GetQuicReloadableFlag(quic_write_with_transmission)),
       deprecate_draining_streams_(
-          GetQuicReloadableFlag(quic_deprecate_draining_streams)) {
+          GetQuicReloadableFlag(quic_deprecate_draining_streams)),
+      break_close_loop_(
+          GetQuicReloadableFlag(quic_break_session_stream_close_loop)) {
   closed_streams_clean_up_alarm_ =
       QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
           new ClosedStreamsCleanUpDelegate(this)));
@@ -132,6 +134,9 @@
       connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
     config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
   }
+  if (break_close_loop_) {
+    QUIC_RELOADABLE_FLAG_COUNT(quic_break_session_stream_close_loop);
+  }
 }
 
 void QuicSession::Initialize() {
@@ -779,6 +784,10 @@
     connection_->OnStreamReset(id, error);
   }
 
+  if (break_close_loop_) {
+    return;
+  }
+
   if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
     OnStreamDoneWaitingForAcks(id);
     return;
@@ -786,6 +795,26 @@
   CloseStreamInner(id, true);
 }
 
+void QuicSession::ResetStream(QuicStreamId id,
+                              QuicRstStreamErrorCode error,
+                              QuicStreamOffset bytes_written) {
+  DCHECK(break_close_loop_);
+  QuicStream* stream = GetStream(id);
+  if (stream != nullptr && stream->is_static()) {
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
+
+  if (stream != nullptr) {
+    stream->Reset(error);
+    return;
+  }
+
+  SendRstStream(id, error, bytes_written);
+}
+
 void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
                                           QuicRstStreamErrorCode error,
                                           QuicStreamOffset bytes_written) {
@@ -878,6 +907,11 @@
         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
     return;
   }
+  if (break_close_loop_) {
+    stream->CloseReadSide();
+    stream->CloseWriteSide();
+    return;
+  }
   StreamType type = stream->type();
 
   // Tell the stream that a RST has been sent.
@@ -949,6 +983,79 @@
   }
 }
 
+void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
+  QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
+  DCHECK(break_close_loop_);
+  StreamMap::iterator it = stream_map_.find(stream_id);
+  if (it == stream_map_.end()) {
+    QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
+    return;
+  }
+  QuicStream* stream = it->second.get();
+  StreamType type = stream->type();
+
+  if (stream->IsWaitingForAcks()) {
+    zombie_streams_[stream->id()] = std::move(it->second);
+  } else {
+    // Clean up the stream since it is no longer waiting for acks.
+    streams_waiting_for_acks_.erase(stream->id());
+    closed_streams_.push_back(std::move(it->second));
+    // Do not retransmit data of a closed stream.
+    streams_with_pending_retransmission_.erase(stream_id);
+    if (!closed_streams_clean_up_alarm_->IsSet()) {
+      closed_streams_clean_up_alarm_->Set(
+          connection_->clock()->ApproximateNow());
+    }
+  }
+
+  // If we haven't received a FIN or RST for this stream, we need to keep track
+  // of the how many bytes the stream's flow controller believes it has
+  // received, for accurate connection level flow control accounting.
+  const bool had_fin_or_rst = stream->HasReceivedFinalOffset();
+  if (!had_fin_or_rst) {
+    InsertLocallyClosedStreamsHighestOffset(
+        stream_id, stream->flow_controller()->highest_received_byte_offset());
+  }
+  bool stream_was_draining = false;
+  if (deprecate_draining_streams_) {
+    stream_was_draining = stream->was_draining();
+    QUIC_DVLOG_IF(1, stream_was_draining)
+        << ENDPOINT << "Stream " << stream_id << " was draining";
+  }
+  stream_map_.erase(it);
+  if (IsIncomingStream(stream_id)) {
+    --num_dynamic_incoming_streams_;
+  }
+  if (!deprecate_draining_streams_) {
+    stream_was_draining =
+        draining_streams_.find(stream_id) != draining_streams_.end();
+  }
+  if (stream_was_draining) {
+    if (IsIncomingStream(stream_id)) {
+      QUIC_BUG_IF(num_draining_incoming_streams_ == 0);
+      --num_draining_incoming_streams_;
+    } else if (deprecate_draining_streams_) {
+      QUIC_BUG_IF(num_draining_outgoing_streams_ == 0);
+      --num_draining_outgoing_streams_;
+    }
+    draining_streams_.erase(stream_id);
+  } else if (VersionHasIetfQuicFrames(transport_version())) {
+    // Stream was not draining, but we did have a fin or rst, so we can now
+    // free the stream ID if version 99.
+    if (had_fin_or_rst && connection_->connected()) {
+      // Do not bother informing stream ID manager if connection is closed.
+      v99_streamid_manager_.OnStreamClosed(stream_id);
+    }
+  }
+
+  if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
+      !VersionHasIetfQuicFrames(transport_version())) {
+    // Streams that first became draining already called OnCanCreate...
+    // This covers the case where the stream went directly to being closed.
+    OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
+  }
+}
+
 void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
   QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
 
@@ -1599,7 +1706,11 @@
     if (!stream_id_manager_.CanOpenIncomingStream(
             GetNumOpenIncomingStreams())) {
       // Refuse to open the stream.
-      SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      if (break_close_loop_) {
+        ResetStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      } else {
+        SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+      }
       return nullptr;
     }
   }