gfe-relnote: In QUIC, break QuicStream::CloseRWSide -> QuicSession::CloseStream -> QuicStream::OnClose -> CloseRWSide loop. Protected by gfe2_reloadable_flag_quic_break_session_stream_close_loop. With this change, a stream can be closed by 4 methods: 1) QuicSession::CloseStream (will soon be removed) 2) QuicSession::ResetStream 3) QuicStream::Reset 4) QuicStream::OnStreamReset And also make the code path consistent. All -> QuicStream::CloseReadSide/CloseWriteSide -> QuicSession::OnStreamClosed. PiperOrigin-RevId: 307032801 Change-Id: I1a2f49ddb94cc9642ce8c8fcb514f5adb928d045
diff --git a/quic/core/http/end_to_end_test.cc b/quic/core/http/end_to_end_test.cc index a4d6cd7..4d02c80 100644 --- a/quic/core/http/end_to_end_test.cc +++ b/quic/core/http/end_to_end_test.cc
@@ -2027,7 +2027,11 @@ // Transmit the cancel, and ensure the connection is torn down properly. SetPacketLossPercentage(0); QuicStreamId stream_id = GetNthClientInitiatedBidirectionalId(0); - session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, 0); + if (session->break_close_loop()) { + session->ResetStream(stream_id, QUIC_STREAM_CANCELLED, 0); + } else { + session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, 0); + } // WaitForEvents waits 50ms and returns true if there are outstanding // requests.
diff --git a/quic/core/http/quic_client_promised_info_test.cc b/quic/core/http/quic_client_promised_info_test.cc index 27d9fe8..381160b 100644 --- a/quic/core/http/quic_client_promised_info_test.cc +++ b/quic/core/http/quic_client_promised_info_test.cc
@@ -227,7 +227,9 @@ EXPECT_CALL(*connection_, SendControlFrame(_)); EXPECT_CALL(*connection_, OnStreamReset(promise_id_, QUIC_PROMISE_VARY_MISMATCH)); - EXPECT_CALL(session_, CloseStream(promise_id_)); + if (!session_.break_close_loop()) { + EXPECT_CALL(session_, CloseStream(promise_id_)); + } promised->HandleClientRequest(client_request_, &delegate); } @@ -303,7 +305,9 @@ session_.GetOrCreateStream(promise_id_); // Cancel the promised stream. - EXPECT_CALL(session_, CloseStream(promise_id_)); + if (!session_.break_close_loop()) { + EXPECT_CALL(session_, CloseStream(promise_id_)); + } EXPECT_CALL(*connection_, SendControlFrame(_)); EXPECT_CALL(*connection_, OnStreamReset(promise_id_, QUIC_STREAM_CANCELLED)); promised->Cancel(); @@ -327,11 +331,17 @@ promise_stream->OnStreamHeaderList(false, headers.uncompressed_header_bytes(), headers); - EXPECT_CALL(session_, CloseStream(promise_id_)); + if (!session_.break_close_loop()) { + EXPECT_CALL(session_, CloseStream(promise_id_)); + } EXPECT_CALL(*connection_, SendControlFrame(_)); EXPECT_CALL(*connection_, OnStreamReset(promise_id_, QUIC_STREAM_PEER_GOING_AWAY)); - session_.SendRstStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0); + if (session_.break_close_loop()) { + session_.ResetStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0); + } else { + session_.SendRstStream(promise_id_, QUIC_STREAM_PEER_GOING_AWAY, 0); + } // Now initiate rendezvous. TestPushPromiseDelegate delegate(/*match=*/true);
diff --git a/quic/core/http/quic_spdy_client_session_base.cc b/quic/core/http/quic_spdy_client_session_base.cc index 2655259..63de57c 100644 --- a/quic/core/http/quic_spdy_client_session_base.cc +++ b/quic/core/http/quic_spdy_client_session_base.cc
@@ -204,7 +204,11 @@ QuicStreamId id, QuicRstStreamErrorCode error_code) { DCHECK(QuicUtils::IsServerInitiatedStreamId(transport_version(), id)); - SendRstStream(id, error_code, 0); + if (break_close_loop()) { + ResetStream(id, error_code, 0); + } else { + SendRstStream(id, error_code, 0); + } if (!IsOpenStream(id) && !IsClosedStream(id)) { MaybeIncreaseLargestPeerStreamId(id); } @@ -218,6 +222,14 @@ } } +void QuicSpdyClientSessionBase::OnStreamClosed(QuicStreamId stream_id) { + DCHECK(break_close_loop()); + QuicSpdySession::OnStreamClosed(stream_id); + if (!VersionUsesHttp3(transport_version())) { + headers_stream()->MaybeReleaseSequencerBuffer(); + } +} + bool QuicSpdyClientSessionBase::ShouldReleaseHeadersStreamSequencerBuffer() { return !HasActiveRequestStreams() && promised_by_id_.empty(); }
diff --git a/quic/core/http/quic_spdy_client_session_base.h b/quic/core/http/quic_spdy_client_session_base.h index 9d702e3..282a8cf 100644 --- a/quic/core/http/quic_spdy_client_session_base.h +++ b/quic/core/http/quic_spdy_client_session_base.h
@@ -105,6 +105,9 @@ // Release headers stream's sequencer buffer if it's empty. void CloseStreamInner(QuicStreamId stream_id, bool rst_sent) override; + // Release headers stream's sequencer buffer if it's empty. + void OnStreamClosed(QuicStreamId stream_id) override; + // Returns true if there are no active requests and no promised streams. bool ShouldReleaseHeadersStreamSequencerBuffer() override;
diff --git a/quic/core/http/quic_spdy_client_session_test.cc b/quic/core/http/quic_spdy_client_session_test.cc index 65dd43a..dc72a73 100644 --- a/quic/core/http/quic_spdy_client_session_test.cc +++ b/quic/core/http/quic_spdy_client_session_test.cc
@@ -354,7 +354,11 @@ .Times(AtLeast(1)) .WillRepeatedly(Invoke(&ClearControlFrame)); EXPECT_CALL(*connection_, OnStreamReset(_, _)).Times(1); - session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + if (session_->break_close_loop()) { + session_->ResetStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + } else { + session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + } // A new stream cannot be created as the reset stream still counts as an open // outgoing stream until closed by the server. @@ -406,7 +410,11 @@ .Times(AtLeast(1)) .WillRepeatedly(Invoke(&ClearControlFrame)); EXPECT_CALL(*connection_, OnStreamReset(_, _)).Times(1); - session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + if (session_->break_close_loop()) { + session_->ResetStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + } else { + session_->SendRstStream(stream_id, QUIC_STREAM_PEER_GOING_AWAY, 0); + } // The stream receives trailers with final byte offset, but the header value // is non-numeric and should be treated as malformed. @@ -861,7 +869,12 @@ EXPECT_CALL(*connection_, SendControlFrame(_)); EXPECT_CALL(*connection_, OnStreamReset(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY)); - session_->SendRstStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY, 0); + if (session_->break_close_loop()) { + session_->ResetStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY, 0); + } else { + session_->SendRstStream(promised_stream_id_, QUIC_STREAM_PEER_GOING_AWAY, + 0); + } QuicClientPromisedInfo* promised = session_->GetPromisedById(promised_stream_id_); EXPECT_NE(promised, nullptr);
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc index f46d914..9db9f22 100644 --- a/quic/core/http/quic_spdy_stream_test.cc +++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -317,6 +317,7 @@ &helper_, &alarm_factory_, perspective, SupportedVersions(GetParam())); session_ = std::make_unique<StrictMock<TestSession>>(connection_); session_->Initialize(); + connection_->AdvanceTime(QuicTime::Delta::FromSeconds(1)); ON_CALL(*session_, WritevData(_, _, _, _, _, _)) .WillByDefault( Invoke(session_.get(), &MockQuicSpdySession::ConsumeData));
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc index 4124575..3c6d323 100644 --- a/quic/core/quic_session.cc +++ b/quic/core/quic_session.cc
@@ -124,7 +124,9 @@ write_with_transmission_( GetQuicReloadableFlag(quic_write_with_transmission)), deprecate_draining_streams_( - GetQuicReloadableFlag(quic_deprecate_draining_streams)) { + GetQuicReloadableFlag(quic_deprecate_draining_streams)), + break_close_loop_( + GetQuicReloadableFlag(quic_break_session_stream_close_loop)) { closed_streams_clean_up_alarm_ = QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm( new ClosedStreamsCleanUpDelegate(this))); @@ -132,6 +134,9 @@ connection_->version().handshake_protocol == PROTOCOL_TLS1_3) { config_.SetStatelessResetTokenToSend(GetStatelessResetToken()); } + if (break_close_loop_) { + QUIC_RELOADABLE_FLAG_COUNT(quic_break_session_stream_close_loop); + } } void QuicSession::Initialize() { @@ -779,6 +784,10 @@ connection_->OnStreamReset(id, error); } + if (break_close_loop_) { + return; + } + if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) { OnStreamDoneWaitingForAcks(id); return; @@ -786,6 +795,26 @@ CloseStreamInner(id, true); } +void QuicSession::ResetStream(QuicStreamId id, + QuicRstStreamErrorCode error, + QuicStreamOffset bytes_written) { + DCHECK(break_close_loop_); + QuicStream* stream = GetStream(id); + if (stream != nullptr && stream->is_static()) { + connection()->CloseConnection( + QUIC_INVALID_STREAM_ID, "Try to reset a static stream", + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); + return; + } + + if (stream != nullptr) { + stream->Reset(error); + return; + } + + SendRstStream(id, error, bytes_written); +} + void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id, QuicRstStreamErrorCode error, QuicStreamOffset bytes_written) { @@ -878,6 +907,11 @@ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); return; } + if (break_close_loop_) { + stream->CloseReadSide(); + stream->CloseWriteSide(); + return; + } StreamType type = stream->type(); // Tell the stream that a RST has been sent. @@ -949,6 +983,79 @@ } } +void QuicSession::OnStreamClosed(QuicStreamId stream_id) { + QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id; + DCHECK(break_close_loop_); + StreamMap::iterator it = stream_map_.find(stream_id); + if (it == stream_map_.end()) { + QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id; + return; + } + QuicStream* stream = it->second.get(); + StreamType type = stream->type(); + + if (stream->IsWaitingForAcks()) { + zombie_streams_[stream->id()] = std::move(it->second); + } else { + // Clean up the stream since it is no longer waiting for acks. + streams_waiting_for_acks_.erase(stream->id()); + closed_streams_.push_back(std::move(it->second)); + // Do not retransmit data of a closed stream. + streams_with_pending_retransmission_.erase(stream_id); + if (!closed_streams_clean_up_alarm_->IsSet()) { + closed_streams_clean_up_alarm_->Set( + connection_->clock()->ApproximateNow()); + } + } + + // If we haven't received a FIN or RST for this stream, we need to keep track + // of the how many bytes the stream's flow controller believes it has + // received, for accurate connection level flow control accounting. + const bool had_fin_or_rst = stream->HasReceivedFinalOffset(); + if (!had_fin_or_rst) { + InsertLocallyClosedStreamsHighestOffset( + stream_id, stream->flow_controller()->highest_received_byte_offset()); + } + bool stream_was_draining = false; + if (deprecate_draining_streams_) { + stream_was_draining = stream->was_draining(); + QUIC_DVLOG_IF(1, stream_was_draining) + << ENDPOINT << "Stream " << stream_id << " was draining"; + } + stream_map_.erase(it); + if (IsIncomingStream(stream_id)) { + --num_dynamic_incoming_streams_; + } + if (!deprecate_draining_streams_) { + stream_was_draining = + draining_streams_.find(stream_id) != draining_streams_.end(); + } + if (stream_was_draining) { + if (IsIncomingStream(stream_id)) { + QUIC_BUG_IF(num_draining_incoming_streams_ == 0); + --num_draining_incoming_streams_; + } else if (deprecate_draining_streams_) { + QUIC_BUG_IF(num_draining_outgoing_streams_ == 0); + --num_draining_outgoing_streams_; + } + draining_streams_.erase(stream_id); + } else if (VersionHasIetfQuicFrames(transport_version())) { + // Stream was not draining, but we did have a fin or rst, so we can now + // free the stream ID if version 99. + if (had_fin_or_rst && connection_->connected()) { + // Do not bother informing stream ID manager if connection is closed. + v99_streamid_manager_.OnStreamClosed(stream_id); + } + } + + if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst && + !VersionHasIetfQuicFrames(transport_version())) { + // Streams that first became draining already called OnCanCreate... + // This covers the case where the stream went directly to being closed. + OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL); + } +} + void QuicSession::ClosePendingStream(QuicStreamId stream_id) { QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id; @@ -1599,7 +1706,11 @@ if (!stream_id_manager_.CanOpenIncomingStream( GetNumOpenIncomingStreams())) { // Refuse to open the stream. - SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0); + if (break_close_loop_) { + ResetStream(stream_id, QUIC_REFUSED_STREAM, 0); + } else { + SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0); + } return nullptr; } }
diff --git a/quic/core/quic_session.h b/quic/core/quic_session.h index d1720a0..f67c097 100644 --- a/quic/core/quic_session.h +++ b/quic/core/quic_session.h
@@ -198,13 +198,17 @@ // will be sent in specified transmission |type|. bool WriteControlFrame(const QuicFrame& frame, TransmissionType type); - // Close the stream in both directions. - // TODO(renjietang): rename this method as it sends both RST_STREAM and - // STOP_SENDING in IETF QUIC. + // Called by stream to send RST_STREAM (and STOP_SENDING). virtual void SendRstStream(QuicStreamId id, QuicRstStreamErrorCode error, QuicStreamOffset bytes_written); + // Called to send RST_STREAM (and STOP_SENDING) and close stream. If stream + // |id| does not exist, just send RST_STREAM (and STOP_SENDING). + virtual void ResetStream(QuicStreamId id, + QuicRstStreamErrorCode error, + QuicStreamOffset bytes_written); + // Called when the session wants to go away and not accept any new streams. virtual void SendGoAway(QuicErrorCode error_code, const std::string& reason); @@ -217,9 +221,15 @@ // Create and transmit a STOP_SENDING frame virtual void SendStopSending(uint16_t code, QuicStreamId stream_id); - // Removes the stream associated with 'stream_id' from the active stream map. + // Close stream |stream_id|. Whether sending RST_STREAM (and STOP_SENDING) + // depends on the sending and receiving states. + // TODO(fayang): Deprecate CloseStream, instead always use ResetStream to + // close a stream from session. virtual void CloseStream(QuicStreamId stream_id); + // Called by stream |stream_id| when it gets closed. + virtual void OnStreamClosed(QuicStreamId stream_id); + // Returns true if outgoing packets will be encrypted, even if the server // hasn't confirmed the handshake yet. virtual bool IsEncryptionEstablished() const; @@ -487,6 +497,8 @@ return deprecate_draining_streams_; } + bool break_close_loop() const { return break_close_loop_; } + protected: using StreamMap = QuicSmallMap<QuicStreamId, std::unique_ptr<QuicStream>, 10>; @@ -535,6 +547,7 @@ // Performs the work required to close |stream_id|. If |rst_sent| then a // Reset Stream frame has already been sent for this stream. + // TODO(fayang): Remove CloseStreamInner. virtual void CloseStreamInner(QuicStreamId stream_id, bool rst_sent); // When a stream is closed locally, it may not yet know how many bytes the @@ -833,6 +846,9 @@ // Latched value of quic_deprecate_draining_streams. const bool deprecate_draining_streams_; + + // Latched value of quic_break_session_stream_close_loop. + const bool break_close_loop_; }; } // namespace quic
diff --git a/quic/core/quic_session_test.cc b/quic/core/quic_session_test.cc index bf4bffc..80ef0d5 100644 --- a/quic/core/quic_session_test.cc +++ b/quic/core/quic_session_test.cc
@@ -2392,8 +2392,14 @@ session_.OnFrameLost(QuicFrame(frame)); // Retransmit stream data causes connection close. Stream has not sent fin // yet, so an RST is sent. - EXPECT_CALL(*stream, OnCanWrite()) - .WillOnce(Invoke(stream, &QuicStream::OnClose)); + if (session_.break_close_loop()) { + EXPECT_CALL(*stream, OnCanWrite()).WillOnce(Invoke([this, stream]() { + session_.CloseStream(stream->id()); + })); + } else { + EXPECT_CALL(*stream, OnCanWrite()) + .WillOnce(Invoke(stream, &QuicStream::OnClose)); + } if (VersionHasIetfQuicFrames(transport_version())) { // Once for the RST_STREAM, once for the STOP_SENDING EXPECT_CALL(*connection_, SendControlFrame(_)) @@ -2751,6 +2757,7 @@ TestStream* stream = session_.CreateOutgoingBidirectionalStream(); QuicStreamId stream_id = stream->id(); + QuicStreamPeer::SetFinSent(stream); stream->CloseWriteSide(); EXPECT_TRUE(stream->write_side_closed()); QuicStopSendingFrame frame(1, stream_id, 123);
diff --git a/quic/core/quic_stream.cc b/quic/core/quic_stream.cc index 813ea2d..75cb9f9 100644 --- a/quic/core/quic_stream.cc +++ b/quic/core/quic_stream.cc
@@ -573,9 +573,16 @@ void QuicStream::Reset(QuicRstStreamErrorCode error) { stream_error_ = error; - // Sending a RstStream results in calling CloseStream. session()->SendRstStream(id(), error, stream_bytes_written()); rst_sent_ = true; + if (session_->break_close_loop()) { + if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) { + session()->OnStreamDoneWaitingForAcks(id_); + return; + } + CloseReadSide(); + CloseWriteSide(); + } } void QuicStream::OnUnrecoverableError(QuicErrorCode error, @@ -762,7 +769,12 @@ if (write_side_closed_) { QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id(); - session_->CloseStream(id()); + if (session_->break_close_loop()) { + session_->OnStreamClosed(id()); + OnClose(); + } else { + session_->CloseStream(id()); + } } } @@ -775,7 +787,12 @@ write_side_closed_ = true; if (read_side_closed_) { QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id(); - session_->CloseStream(id()); + if (session_->break_close_loop()) { + session_->OnStreamClosed(id()); + OnClose(); + } else { + session_->CloseStream(id()); + } } } @@ -798,8 +815,12 @@ } void QuicStream::OnClose() { - CloseReadSide(); - CloseWriteSide(); + if (session()->break_close_loop()) { + DCHECK(read_side_closed_ && write_side_closed_); + } else { + CloseReadSide(); + CloseWriteSide(); + } if (!fin_sent_ && !rst_sent_) { // For flow control accounting, tell the peer how many bytes have been
diff --git a/quic/core/quic_stream.h b/quic/core/quic_stream.h index b8e940f..90dac69 100644 --- a/quic/core/quic_stream.h +++ b/quic/core/quic_stream.h
@@ -165,10 +165,12 @@ // stream to write any pending data. virtual void OnCanWrite(); - // Called by the session just before the object is destroyed. + // Called just before the object is destroyed. // The object should not be accessed after OnClose is called. // Sends a RST_STREAM with code QUIC_RST_ACKNOWLEDGEMENT if neither a FIN nor // a RST_STREAM has been sent. + // TODO(fayang): move this to protected when deprecating + // quic_break_session_stream_close_loop. virtual void OnClose(); // Called by the session when the endpoint receives a RST_STREAM from the @@ -355,6 +357,14 @@ // Does not send a FIN. May cause the stream to be closed. virtual void CloseWriteSide(); + // Close the read side of the stream. May cause the stream to be closed. + // Subclasses and consumers should use StopReading to terminate reading early + // if expecting a FIN. Can be used directly by subclasses if not expecting a + // FIN. + // TODO(fayang): move this to protected when removing + // QuicSession::CloseStream. + void CloseReadSide(); + // Returns true if the stream is static. bool is_static() const { return is_static_; } @@ -364,12 +374,6 @@ const QuicSession* session); protected: - // Close the read side of the socket. May cause the stream to be closed. - // Subclasses and consumers should use StopReading to terminate reading early - // if expecting a FIN. Can be used directly by subclasses if not expecting a - // FIN. - void CloseReadSide(); - // Called when data of [offset, offset + data_length] is buffered in send // buffer. virtual void OnDataBuffered(
diff --git a/quic/core/quic_stream_test.cc b/quic/core/quic_stream_test.cc index 100c05e..ed8e0e4 100644 --- a/quic/core/quic_stream_test.cc +++ b/quic/core/quic_stream_test.cc
@@ -455,7 +455,12 @@ // Now close the stream, and expect that we send a RST. EXPECT_CALL(*session_, SendRstStream(_, _, _)); - stream_->OnClose(); + if (session_->break_close_loop()) { + stream_->CloseReadSide(); + stream_->CloseWriteSide(); + } else { + stream_->OnClose(); + } EXPECT_FALSE(session_->HasUnackedStreamData()); EXPECT_FALSE(fin_sent()); EXPECT_TRUE(rst_sent()); @@ -482,7 +487,12 @@ EXPECT_FALSE(rst_sent()); // Now close the stream, and expect that we do not send a RST. - stream_->OnClose(); + if (session_->break_close_loop()) { + stream_->CloseReadSide(); + stream_->CloseWriteSide(); + } else { + stream_->OnClose(); + } EXPECT_TRUE(fin_sent()); EXPECT_FALSE(rst_sent()); } @@ -506,7 +516,12 @@ // Now close the stream (any further resets being sent would break the // expectation above). - stream_->OnClose(); + if (session_->break_close_loop()) { + stream_->CloseReadSide(); + stream_->CloseWriteSide(); + } else { + stream_->OnClose(); + } EXPECT_FALSE(fin_sent()); EXPECT_TRUE(rst_sent()); } @@ -642,7 +657,12 @@ CloseConnection(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, _, _)); stream_->OnStreamReset(rst_frame); EXPECT_TRUE(stream_->HasReceivedFinalOffset()); - stream_->OnClose(); + if (session_->break_close_loop()) { + stream_->CloseReadSide(); + stream_->CloseWriteSide(); + } else { + stream_->OnClose(); + } } TEST_P(QuicStreamTest, FinalByteOffsetFromZeroLengthStreamFrame) {
diff --git a/quic/quartc/quartc_session.cc b/quic/quartc/quartc_session.cc index a93f410..eab7512 100644 --- a/quic/quartc/quartc_session.cc +++ b/quic/quartc/quartc_session.cc
@@ -198,11 +198,11 @@ } void QuartcSession::CancelStream(QuicStreamId stream_id) { - ResetStream(stream_id, QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); + ResetQuartcStream(stream_id, QuicRstStreamErrorCode::QUIC_STREAM_CANCELLED); } -void QuartcSession::ResetStream(QuicStreamId stream_id, - QuicRstStreamErrorCode error) { +void QuartcSession::ResetQuartcStream(QuicStreamId stream_id, + QuicRstStreamErrorCode error) { if (!IsOpenStream(stream_id)) { return; }
diff --git a/quic/quartc/quartc_session.h b/quic/quartc/quartc_session.h index db988a0..e7dd853 100644 --- a/quic/quartc/quartc_session.h +++ b/quic/quartc/quartc_session.h
@@ -204,7 +204,7 @@ // returns an unowned pointer to the stream for convenience. QuartcStream* ActivateDataStream(std::unique_ptr<QuartcStream> stream); - void ResetStream(QuicStreamId stream_id, QuicRstStreamErrorCode error); + void ResetQuartcStream(QuicStreamId stream_id, QuicRstStreamErrorCode error); const QuicClock* clock() { return clock_; }
diff --git a/quic/quartc/quartc_stream_test.cc b/quic/quartc/quartc_stream_test.cc index 7a8b22d..b387e7f 100644 --- a/quic/quartc/quartc_stream_test.cc +++ b/quic/quartc/quartc_stream_test.cc
@@ -398,7 +398,12 @@ TEST_P(QuartcStreamTest, CloseStream) { CreateReliableQuicStream(); EXPECT_FALSE(mock_stream_delegate_->closed()); - stream_->OnClose(); + if (GetQuicReloadableFlag(quic_break_session_stream_close_loop)) { + stream_->CloseWriteSide(); + stream_->CloseReadSide(); + } else { + stream_->OnClose(); + } EXPECT_TRUE(mock_stream_delegate_->closed()); }
diff --git a/quic/test_tools/quic_test_client.cc b/quic/test_tools/quic_test_client.cc index 5ddc879..c274470 100644 --- a/quic/test_tools/quic_test_client.cc +++ b/quic/test_tools/quic_test_client.cc
@@ -393,8 +393,13 @@ QuicStreamId stream_id = GetNthClientInitiatedBidirectionalStreamId( session->transport_version(), 0); QuicStream* stream = session->GetOrCreateStream(stream_id); - session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, + if (session->break_close_loop()) { + session->ResetStream(stream_id, QUIC_STREAM_CANCELLED, stream->stream_bytes_written()); + } else { + session->SendRstStream(stream_id, QUIC_STREAM_CANCELLED, + stream->stream_bytes_written()); + } return ret; }