gfe-relnote: In QUIC, break QuicStream::CloseRWSide -> QuicSession::CloseStream -> QuicStream::OnClose -> CloseRWSide loop. Protected by gfe2_reloadable_flag_quic_break_session_stream_close_loop.
With this change, a stream can be closed by 4 methods:
1) QuicSession::CloseStream (will soon be removed)
2) QuicSession::ResetStream
3) QuicStream::Reset
4) QuicStream::OnStreamReset
And also make the code path consistent. All -> QuicStream::CloseReadSide/CloseWriteSide -> QuicSession::OnStreamClosed.
PiperOrigin-RevId: 307032801
Change-Id: I1a2f49ddb94cc9642ce8c8fcb514f5adb928d045
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
index 4124575..3c6d323 100644
--- a/quic/core/quic_session.cc
+++ b/quic/core/quic_session.cc
@@ -124,7 +124,9 @@
write_with_transmission_(
GetQuicReloadableFlag(quic_write_with_transmission)),
deprecate_draining_streams_(
- GetQuicReloadableFlag(quic_deprecate_draining_streams)) {
+ GetQuicReloadableFlag(quic_deprecate_draining_streams)),
+ break_close_loop_(
+ GetQuicReloadableFlag(quic_break_session_stream_close_loop)) {
closed_streams_clean_up_alarm_ =
QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
new ClosedStreamsCleanUpDelegate(this)));
@@ -132,6 +134,9 @@
connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
}
+ if (break_close_loop_) {
+ QUIC_RELOADABLE_FLAG_COUNT(quic_break_session_stream_close_loop);
+ }
}
void QuicSession::Initialize() {
@@ -779,6 +784,10 @@
connection_->OnStreamReset(id, error);
}
+ if (break_close_loop_) {
+ return;
+ }
+
if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
OnStreamDoneWaitingForAcks(id);
return;
@@ -786,6 +795,26 @@
CloseStreamInner(id, true);
}
+void QuicSession::ResetStream(QuicStreamId id,
+ QuicRstStreamErrorCode error,
+ QuicStreamOffset bytes_written) {
+ DCHECK(break_close_loop_);
+ QuicStream* stream = GetStream(id);
+ if (stream != nullptr && stream->is_static()) {
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+
+ if (stream != nullptr) {
+ stream->Reset(error);
+ return;
+ }
+
+ SendRstStream(id, error, bytes_written);
+}
+
void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
QuicRstStreamErrorCode error,
QuicStreamOffset bytes_written) {
@@ -878,6 +907,11 @@
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
}
+ if (break_close_loop_) {
+ stream->CloseReadSide();
+ stream->CloseWriteSide();
+ return;
+ }
StreamType type = stream->type();
// Tell the stream that a RST has been sent.
@@ -949,6 +983,79 @@
}
}
+void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
+ QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
+ DCHECK(break_close_loop_);
+ StreamMap::iterator it = stream_map_.find(stream_id);
+ if (it == stream_map_.end()) {
+ QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
+ return;
+ }
+ QuicStream* stream = it->second.get();
+ StreamType type = stream->type();
+
+ if (stream->IsWaitingForAcks()) {
+ zombie_streams_[stream->id()] = std::move(it->second);
+ } else {
+ // Clean up the stream since it is no longer waiting for acks.
+ streams_waiting_for_acks_.erase(stream->id());
+ closed_streams_.push_back(std::move(it->second));
+ // Do not retransmit data of a closed stream.
+ streams_with_pending_retransmission_.erase(stream_id);
+ if (!closed_streams_clean_up_alarm_->IsSet()) {
+ closed_streams_clean_up_alarm_->Set(
+ connection_->clock()->ApproximateNow());
+ }
+ }
+
+ // If we haven't received a FIN or RST for this stream, we need to keep track
+ // of the how many bytes the stream's flow controller believes it has
+ // received, for accurate connection level flow control accounting.
+ const bool had_fin_or_rst = stream->HasReceivedFinalOffset();
+ if (!had_fin_or_rst) {
+ InsertLocallyClosedStreamsHighestOffset(
+ stream_id, stream->flow_controller()->highest_received_byte_offset());
+ }
+ bool stream_was_draining = false;
+ if (deprecate_draining_streams_) {
+ stream_was_draining = stream->was_draining();
+ QUIC_DVLOG_IF(1, stream_was_draining)
+ << ENDPOINT << "Stream " << stream_id << " was draining";
+ }
+ stream_map_.erase(it);
+ if (IsIncomingStream(stream_id)) {
+ --num_dynamic_incoming_streams_;
+ }
+ if (!deprecate_draining_streams_) {
+ stream_was_draining =
+ draining_streams_.find(stream_id) != draining_streams_.end();
+ }
+ if (stream_was_draining) {
+ if (IsIncomingStream(stream_id)) {
+ QUIC_BUG_IF(num_draining_incoming_streams_ == 0);
+ --num_draining_incoming_streams_;
+ } else if (deprecate_draining_streams_) {
+ QUIC_BUG_IF(num_draining_outgoing_streams_ == 0);
+ --num_draining_outgoing_streams_;
+ }
+ draining_streams_.erase(stream_id);
+ } else if (VersionHasIetfQuicFrames(transport_version())) {
+ // Stream was not draining, but we did have a fin or rst, so we can now
+ // free the stream ID if version 99.
+ if (had_fin_or_rst && connection_->connected()) {
+ // Do not bother informing stream ID manager if connection is closed.
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+ }
+
+ if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
+ !VersionHasIetfQuicFrames(transport_version())) {
+ // Streams that first became draining already called OnCanCreate...
+ // This covers the case where the stream went directly to being closed.
+ OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
+ }
+}
+
void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
@@ -1599,7 +1706,11 @@
if (!stream_id_manager_.CanOpenIncomingStream(
GetNumOpenIncomingStreams())) {
// Refuse to open the stream.
- SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+ if (break_close_loop_) {
+ ResetStream(stream_id, QUIC_REFUSED_STREAM, 0);
+ } else {
+ SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+ }
return nullptr;
}
}