Send-side RESET_STREAM_AT frame. Nothing utilizes the API yet, so not protected. This API is documented at go/reset-stream-at. The application calls SetReliableSize() at will, which means that all data in the send buffer at that point will be delivered reliably, even if it is later reset. If reliable_size is set, then an incoming STOP_SENDING will result in RESET_STREAM_AT. When the application calls PartialResetWriteSide(), send a RESET_STREAM_AT with the indicated reliable_size. Accept no more data from the application, and notionally acknowledge any sent data beyond reliable_size. (a) if reliable_size has been sent, the write side is closed, and if the read side is closed, the stream will when reliable_size is acked. (b) if reliable_size has been buffered, the write side will not close until the buffered data up to reliable_size has been sent. PiperOrigin-RevId: 704756508
diff --git a/quiche/quic/core/quic_session.cc b/quiche/quic/core/quic_session.cc index b91dbbd..a804c47 100644 --- a/quiche/quic/core/quic_session.cc +++ b/quiche/quic/core/quic_session.cc
@@ -1089,6 +1089,18 @@ connection_->OnStreamReset(id, error.internal_code()); } +void QuicSession::MaybeSendResetStreamAtFrame(QuicStreamId id, + QuicResetStreamError error, + QuicStreamOffset bytes_written, + QuicStreamOffset reliable_size) { + QUICHE_DCHECK(connection()->reliable_stream_reset_enabled()); + if (!connection()->connected()) { + return; + } + control_frame_manager_.WriteOrBufferResetStreamAt(id, error, bytes_written, + reliable_size); +} + void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id, QuicResetStreamError error) { if (!connection()->connected()) {
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h index b15dbac..23537f0 100644 --- a/quiche/quic/core/quic_session.h +++ b/quiche/quic/core/quic_session.h
@@ -618,6 +618,12 @@ virtual void MaybeSendRstStreamFrame(QuicStreamId id, QuicResetStreamError error, QuicStreamOffset bytes_written); + // Does actual work of sending RESET_STREAM_AT, if the stream type allows. + // Also informs the connection so that pending stream frames can be flushed. + virtual void MaybeSendResetStreamAtFrame(QuicStreamId id, + QuicResetStreamError error, + QuicStreamOffset bytes_written, + QuicStreamOffset reliable_size); // Sends a STOP_SENDING frame if the stream type allows. virtual void MaybeSendStopSendingFrame(QuicStreamId id,
diff --git a/quiche/quic/core/quic_session_test.cc b/quiche/quic/core/quic_session_test.cc index 25008ae..452d962 100644 --- a/quiche/quic/core/quic_session_test.cc +++ b/quiche/quic/core/quic_session_test.cc
@@ -434,6 +434,13 @@ return num_incoming_streams_created_; } + void EnableReliableStreamReset() { + QuicConfig* quic_config = config(); + ASSERT_TRUE(quic_config != nullptr); + quic_config->SetReliableStreamReset(true); + connection()->SetFromConfig(*quic_config); + } + using QuicSession::ActivateStream; using QuicSession::CanOpenNextOutgoingBidirectionalStream; using QuicSession::CanOpenNextOutgoingUnidirectionalStream; @@ -3283,6 +3290,39 @@ session_.ResetStream(bidirectional, QUIC_STREAM_CANCELLED); } +TEST_P(QuicSessionTestServer, AcceptReliableSizeIfNegotiated) { + CompleteHandshake(); + if (!VersionHasIetfQuicFrames(transport_version())) { + return; + } + session_.EnableReliableStreamReset(); + MockPacketWriter* writer = static_cast<MockPacketWriter*>( + QuicConnectionPeer::GetWriter(session_.connection())); + TestStream* write_only = session_.CreateOutgoingUnidirectionalStream(); + EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _)) + .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0))); + session_.SendStreamData(write_only); + EXPECT_FALSE(write_only->fin_sent()); + EXPECT_TRUE(write_only->SetReliableSize()); +} + +TEST_P(QuicSessionTestServer, RejectReliableSizeNotNegotiated) { + if (!VersionHasIetfQuicFrames(transport_version())) { + return; + } + CompleteHandshake(); + ASSERT_FALSE(session_.connection()->reliable_stream_reset_enabled()); + TestStream* bidirectional = + session_.CreateIncomingStream(GetNthClientInitiatedBidirectionalId(0)); + MockPacketWriter* writer = static_cast<MockPacketWriter*>( + QuicConnectionPeer::GetWriter(session_.connection())); + EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _)) + .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0))); + session_.SendStreamData(bidirectional); + EXPECT_FALSE(bidirectional->fin_sent()); + EXPECT_FALSE(bidirectional->SetReliableSize()); +} + TEST_P(QuicSessionTestServer, DecryptionKeyAvailableBeforeEncryptionKey) { if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) { return;
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc index 7f10791..e65e0d7 100644 --- a/quiche/quic/core/quic_stream.cc +++ b/quiche/quic/core/quic_stream.cc
@@ -379,6 +379,7 @@ fin_lost_(false), fin_received_(fin_received), rst_sent_(false), + rst_stream_at_sent_(false), rst_received_(false), stop_sending_sent_(false), flow_controller_(std::move(flow_controller)), @@ -400,7 +401,8 @@ : type), creation_time_(session->connection()->clock()->ApproximateNow()), pending_duration_(pending_duration), - perspective_(session->perspective()) { + perspective_(session->perspective()), + reliable_size_(0) { if (type_ == WRITE_UNIDIRECTIONAL) { fin_received_ = true; CloseReadSide(); @@ -531,7 +533,17 @@ } stream_error_ = error; - MaybeSendRstStream(error); + if (reliable_size_ == 0) { + MaybeSendRstStream(error); + } else { + // The spec is ambiguous as to whether a RESET_STREAM or RESET_STREAM_AT + // should be sent in response to a STOP_SENDING frame if the write side has + // specified a reliable size. Because STOP_SENDING and RESET_STREAM_AT could + // cross in flight, send RESET_STREAM_AT if reliable_size is set, so that + // the result of setting reliable_size is consistent. ResetWriteSide() will + // check reliable_size and do the right thing. + PartialResetWriteSide(error); + } if (session()->enable_stop_sending_for_zombie_streams() && read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) { QUIC_RELOADABLE_FLAG_COUNT_N(quic_deliver_stop_sending_to_zombie_streams, 3, @@ -660,8 +672,24 @@ ResetWithError(QuicResetStreamError::FromInternal(error)); } +bool QuicStream::SetReliableSize() { + if (rst_sent_ || rst_stream_at_sent_) { + return false; + } + if (!session_->connection()->reliable_stream_reset_enabled() || + !VersionHasIetfQuicFrames(transport_version()) || + type_ == READ_UNIDIRECTIONAL) { + return false; + } + reliable_size_ = send_buffer_.stream_offset(); + return true; +} + void QuicStream::ResetWithError(QuicResetStreamError error) { stream_error_ = error; + // The caller has explicitly abandoned reliable delivery of anything in the + // stream, so adjust stream state accordingly. + reliable_size_ = 0; QuicConnection::ScopedPacketFlusher flusher(session()->connection()); MaybeSendStopSending(error); MaybeSendRstStream(error); @@ -680,6 +708,42 @@ } } +void QuicStream::PartialResetWriteSide(QuicResetStreamError error) { + if (reliable_size_ == 0) { + QUIC_BUG(quic_bug_reliable_size_not_set) + << "QuicStream::PartialResetWriteSide called when reliable_size_ is 0"; + return; + } + if (rst_sent_) { + QUIC_BUG(quic_bug_reset_stream_at_after_rst_sent) + << "QuicStream::PartialResetWriteSide on reset stream"; + return; + } + stream_error_ = error; + MaybeSendResetStreamAt(error); + if (reliable_size_ <= stream_bytes_written()) { + // Notionally ack unreliable, previously consumed data so that it's not + // retransmitted, and the buffer can free the memory. + QuicByteCount newly_acked; + send_buffer_.OnStreamDataAcked( + reliable_size_, stream_bytes_written() - reliable_size_, &newly_acked); + fin_outstanding_ = false; // Do not wait to close until FIN is acked. + fin_lost_ = false; + if (!IsWaitingForAcks()) { + session_->connection()->OnStreamReset(id_, stream_error_.internal_code()); + } + CloseWriteSide(); + } else { + // If stream_bytes_written() < reliable_size_, then the write side can't + // close until buffered data is sent. + QUIC_BUG_IF(quic_bug_unexpected_write_side_closed, write_side_closed_) + << "Write side closed with unsent reliable data"; + } + if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) { + session()->MaybeCloseZombieStream(id_); + } +} + void QuicStream::SendStopSending(QuicResetStreamError error) { stream_error_ = error; MaybeSendStopSending(error); @@ -733,8 +797,9 @@ return; } - if (fin_buffered_) { - QUIC_BUG(quic_bug_10586_3) << "Fin already buffered"; + if (fin_buffered_ || rst_stream_at_sent_) { + QUIC_BUG(quic_bug_10586_3) + << "Fin already buffered, or RESET_STREAM_AT sent"; return; } if (write_side_closed_) { @@ -791,7 +856,8 @@ if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) { WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData()); } - if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) { + if (!fin_buffered_ && !fin_sent_ && !rst_stream_at_sent_ && + CanWriteNewData()) { // Notify upper layer to write new data when buffered data size is below // low water mark. OnCanWriteNewData(); @@ -834,8 +900,9 @@ return consumed_data; } - if (fin_buffered_) { - QUIC_BUG(quic_bug_10586_7) << "Fin already buffered"; + if (fin_buffered_ || rst_stream_at_sent_) { + QUIC_BUG(quic_bug_10586_7) + << "Fin already buffered or RESET_STREAM_AT sent"; return consumed_data; } @@ -940,6 +1007,8 @@ } void QuicStream::MaybeSendRstStream(QuicResetStreamError error) { + // It is OK to send RESET_STREAM after RESET_STREAM_AT. reliable_size can + // always decrease in the spec, so it doesn't check rst_stream_at_sent_. if (rst_sent_) { return; } @@ -954,9 +1023,31 @@ CloseWriteSide(); } +void QuicStream::MaybeSendResetStreamAt(QuicResetStreamError error) { + if (!session_->connection()->reliable_stream_reset_enabled() || + !VersionHasIetfQuicFrames(transport_version())) { + QUIC_BUG_IF(quic_bug_gquic_calling_reset_stream_at, + !VersionHasIetfQuicFrames(transport_version())) + << "gQUIC is calling MaybeSendResetStreamAt"; + MaybeSendRstStream(error); + return; + } + if (rst_sent_ || rst_stream_at_sent_) { + return; + } + // If data has been buffered but not sent, it doesn't normally count towards + // final_size. However, if that buffered data is within reliable_size_, it + // will have to be sent, and therefore needs to be included in final_size. + QuicByteCount final_size = std::max(stream_bytes_written(), reliable_size_); + session()->MaybeSendResetStreamAtFrame(id(), error, final_size, + reliable_size_); + rst_stream_at_sent_ = true; +} + bool QuicStream::HasBufferedData() const { QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written()); - return send_buffer_.stream_offset() > stream_bytes_written(); + return (send_buffer_.stream_offset() > stream_bytes_written() && + (!rst_stream_at_sent_ || reliable_size_ > stream_bytes_written())); } ParsedQuicVersion QuicStream::version() const { return session_->version(); } @@ -977,11 +1068,11 @@ void QuicStream::OnClose() { QUICHE_DCHECK(read_side_closed_ && write_side_closed_); - if (!fin_sent_ && !rst_sent_) { + if (!fin_sent_ && !rst_sent_ && !rst_stream_at_sent_) { QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() && session()->version().UsesHttp3()) - << "The stream should've already sent RST in response to " - "STOP_SENDING"; + << "The stream should've already sent RESET_STREAM or RESET_STREAM_AT " + "in response to STOP_SENDING"; // For flow control accounting, tell the peer how many bytes have been // written on this stream before termination. Done here if needed, using a // RST_STREAM frame. @@ -1169,6 +1260,9 @@ !write_side_data_recvd_state_notified_) { OnWriteSideInDataRecvdState(); write_side_data_recvd_state_notified_ = true; + if (rst_stream_at_sent_) { + session_->connection()->OnStreamReset(id_, stream_error_.internal_code()); + } } if (notify_ack_listener_earlier_ && new_data_acked) { QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3); @@ -1291,6 +1385,20 @@ // Size of buffered data. QuicByteCount write_length = BufferedDataBytes(); + // Do not send data beyond reliable_size_. + // TODO(martinduke): This code could be simpler if partial reset directly + // deleted data from the buffer, instead of notionally acking it. Since unsent + // data can't be acked, it's still in the buffer and has to be explicitly not + // sent. + if (rst_stream_at_sent_ && + stream_bytes_written() + write_length > reliable_size_) { + if (reliable_size_ <= stream_bytes_written()) { + QUIC_BUG(quic_bug_reliable_size_already_sent) + << "Call to WriteBufferedData after reliable_size_ has been sent."; + return; + } + write_length = reliable_size_ - stream_bytes_written(); + } // A FIN with zero data payload should not be flow control blocked. bool fin_with_zero_data = (fin_buffered_ && write_length == 0); @@ -1366,6 +1474,11 @@ if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) { busy_counter_ = 0; } + if (rst_stream_at_sent_ && stream_bytes_written() >= reliable_size_) { + // If data up to reliable_size_ has been sent, the write side can finally + // close. + CloseWriteSide(); + } } uint64_t QuicStream::BufferedDataBytes() const {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h index f4a9f58..5604e66 100644 --- a/quiche/quic/core/quic_stream.h +++ b/quiche/quic/core/quic_stream.h
@@ -181,9 +181,21 @@ // interface. void Reset(QuicRstStreamErrorCode error); - // Reset() sends both RESET_STREAM and STOP_SENDING; the two methods below - // allow to send only one of those. + // Record the current offset as the reliable size to be delivered if a partial + // reset is called. Returns false if a RST_STREAM or RESET_STREAM_AT has + // already been sent, the stream is receive-only, or the connection does not + // support RESET_STREAM_AT. + bool SetReliableSize(); + + // Send a RESET_STREAM_AT with a reliable size that had earlier been set by + // SetReliableSize(). Does not send STOP_SENDING and does not close the read + // side. Will trigger QUIC_BUG if reliable_size_ is zero. + void PartialResetWriteSide(QuicResetStreamError error); + // TODO(rch): Delete this function once Envoy has migrated to + // PartialResetWriteSide. void ResetWriteSide(QuicResetStreamError error); + // Reset() sends both RESET_STREAM and STOP_SENDING; this allows the caller to + // send only STOP_SENDING. void SendStopSending(QuicResetStreamError error); // Called by the subclass or the sequencer to close the entire connection from @@ -470,6 +482,8 @@ // Send RESET_STREAM if it hasn't been sent yet. void MaybeSendRstStream(QuicResetStreamError error); + // Send RESET_STREAM_AT if neither it nor RESET_STREAM has been sent yet. + void MaybeSendResetStreamAt(QuicResetStreamError error); // Convenience wrappers for two methods above. void MaybeSendRstStream(QuicRstStreamErrorCode error) { @@ -596,10 +610,11 @@ // StreamFrame with the FIN set. bool fin_received_; - // True if an RST_STREAM has been sent to the session. - // In combination with fin_sent_, used to ensure that a FIN and/or a - // RST_STREAM is always sent to terminate the stream. + // True if an RST_STREAM or RESET_STREAM_AT has been sent to the session. + // In combination with fin_sent_, used to ensure that a FIN, RST_STREAM, or + // RESET_STREAM_AT is always sent to terminate the stream. bool rst_sent_; + bool rst_stream_at_sent_; // True if this stream has received a RST_STREAM frame. bool rst_received_; @@ -660,6 +675,10 @@ const bool notify_ack_listener_earlier_ = GetQuicReloadableFlag(quic_notify_ack_listener_earlier); + + // If the stream is reset, outgoing data up to reliable_size_will be + // delivered (and acknowledged) before the write side of the stream is closed. + QuicStreamOffset reliable_size_; }; } // namespace quic
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc index d738fd6..c7bd13d 100644 --- a/quiche/quic/core/quic_stream_test.cc +++ b/quiche/quic/core/quic_stream_test.cc
@@ -4,6 +4,7 @@ #include "quiche/quic/core/quic_stream.h" +#include <cmath> #include <cstddef> #include <memory> #include <optional> @@ -21,6 +22,7 @@ #include "quiche/quic/core/quic_connection.h" #include "quiche/quic/core/quic_constants.h" #include "quiche/quic/core/quic_error_codes.h" +#include "quiche/quic/core/quic_stream_sequencer.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/quic_utils.h" #include "quiche/quic/core/quic_versions.h" @@ -91,6 +93,8 @@ ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1)); } + QuicStreamSequencer* sequencer() { return QuicStream::sequencer(); } + private: std::string data_; }; @@ -121,6 +125,7 @@ QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesOutgoingBidirectional( session_->config(), kMinimumFlowControlSendWindow); QuicConfigPeer::SetReceivedMaxUnidirectionalStreams(session_->config(), 10); + session_->config()->SetReliableStreamReset(true); session_->OnConfigNegotiated(); stream_ = new StrictMock<TestStream>(kTestStreamId, session_.get(), @@ -167,6 +172,23 @@ return true; } + // Use application stream interface for sending data. This will trigger a call + // to mock_stream->Writev(_, _) that will have to return QuicConsumedData. + QuicConsumedData SendApplicationData(TestStream* stream, + absl::string_view data, size_t iov_len, + bool fin) { + struct iovec iov = {const_cast<char*>(data.data()), iov_len}; + quiche::QuicheMemSliceStorage storage( + &iov, 1, + session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024); + return stream->WriteMemSlices(storage.ToSpan(), fin); + } + + QuicConsumedData SendApplicationData(absl::string_view data, size_t iov_len, + bool fin) { + return SendApplicationData(stream_, data, iov_len, fin); + } + protected: MockQuicConnectionHelper helper_; MockAlarmFactory alarm_factory_; @@ -1222,11 +1244,7 @@ // Testing Writev. EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) .WillOnce(Return(QuicConsumedData(0, false))); - struct iovec iov = {const_cast<char*>(data.data()), data.length()}; - quiche::QuicheMemSliceStorage storage( - &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(), - 1024); - QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false); + QuicConsumedData consumed = SendApplicationData(data, data.length(), false); // There is no buffered data before, all data should be consumed without // respecting buffered data upper limit. @@ -1236,10 +1254,8 @@ EXPECT_FALSE(stream_->CanWriteNewData()); EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0); - quiche::QuicheMemSliceStorage storage2( - &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(), - 1024); - consumed = stream_->WriteMemSlices(storage2.ToSpan(), false); + consumed = SendApplicationData(data, data.length(), false); + // No Data can be consumed as buffered data is beyond upper limit. EXPECT_EQ(0u, consumed.bytes_consumed); EXPECT_FALSE(consumed.fin_consumed); @@ -1261,10 +1277,7 @@ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0); // All data can be consumed as buffered data is below upper limit. - quiche::QuicheMemSliceStorage storage3( - &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(), - 1024); - consumed = stream_->WriteMemSlices(storage3.ToSpan(), false); + consumed = SendApplicationData(data, data.length(), false); EXPECT_EQ(data.length(), consumed.bytes_consumed); EXPECT_FALSE(consumed.fin_consumed); EXPECT_EQ(data.length() + GetQuicFlag(quic_buffered_data_threshold) - 1, @@ -1279,21 +1292,13 @@ stream_); EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData)); - struct iovec iov = {const_cast<char*>(data.data()), 5u}; - quiche::QuicheMemSliceStorage storage( - &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(), - 1024); - QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false); + QuicConsumedData consumed = SendApplicationData(data, 5, false); EXPECT_EQ(data.length(), consumed.bytes_consumed); - struct iovec iov2 = {const_cast<char*>(data.data()), 1u}; - quiche::QuicheMemSliceStorage storage2( - &iov2, 1, - session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024); EXPECT_QUIC_BUG( { EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _)); - stream_->WriteMemSlices(storage2.ToSpan(), false); + SendApplicationData(data, 1, false); }, "Write too many data via stream"); } @@ -1942,6 +1947,353 @@ QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100))); } +TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfReset) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + QuicByteCount newly_acked_length = 0; + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + EXPECT_TRUE(closed_streams->empty()); + // Peer sends RST_STREAM in response. + QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(), + QUIC_STREAM_CANCELLED, 1234); + stream_->OnStreamReset(rst_frame); + EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id()); + ASSERT_EQ(closed_streams->size(), 1); +} + +TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetAndRetransmitted) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_TRUE(stream_->SetReliableSize()); + // Send 50 more bytes that aren't reliable. + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(50, false))); + SendApplicationData(data, 50, false); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + + // Lose all the bytes. + stream_->OnStreamFrameLost(0, 150, false); + // Cause retransmission of the reliable bytes. + EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + stream_->OnCanWrite(); + + // Ack the reliable bytes, and close. + QuicByteCount newly_acked_length = 0; + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + EXPECT_TRUE(closed_streams->empty()); + // Peer sends RST_STREAM in response. + QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(), + QUIC_STREAM_CANCELLED, 1234); + stream_->OnStreamReset(rst_frame); + EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id()); + ASSERT_EQ(closed_streams->size(), 1); +} + +TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideReset) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + EXPECT_TRUE(stream_->SetReliableSize()); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + + // Peer sends RST_STREAM in response. + QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(), + QUIC_STREAM_CANCELLED, 1234); + stream_->OnStreamReset(rst_frame); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + ASSERT_TRUE(closed_streams->empty()); + QuicByteCount newly_acked_length = 0; + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + ASSERT_EQ(closed_streams->size(), 1); + EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id()); +} + +TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideFin) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + EXPECT_TRUE(stream_->SetReliableSize()); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + EXPECT_TRUE(stream_->write_side_closed()); + + // Peer sends OOO FIN. + stream_->OnStreamFrame( + QuicStreamFrame(stream_->id(), true, sizeof(data), "")); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + ASSERT_TRUE(closed_streams->empty()); + EXPECT_FALSE(stream_->read_side_closed()); // Missing the data before 100. + + QuicByteCount newly_acked_length = 0; + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + ASSERT_TRUE(closed_streams->empty()); + // The rest of the stream arrives. + EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([&]() { + // Most classes derived from QuicStream do something like this in + // OnDataAvailable. This is how FIN-related state is updated. + std::string buffer; + stream_->sequencer()->Read(&buffer); + if (stream_->sequencer()->IsClosed()) { + stream_->OnFinRead(); + } + }); + stream_->OnStreamFrame(QuicStreamFrame( + stream_->id(), false, 0, absl::string_view(data, sizeof(data)))); + EXPECT_TRUE(stream_->read_side_closed()); + ASSERT_EQ(closed_streams->size(), 1); + EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id()); +} + +TEST_P(QuicStreamTest, ReliableSizeAckedAtTimeOfReset) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + char data[100]; + SendApplicationData(data, 100, false); + QuicByteCount newly_acked_length = 0; + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); +} + +TEST_P(QuicStreamTest, BufferedDataInReliableSize) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _)) + .WillOnce(Return(QuicConsumedData(50, false))); + char data[100]; + // 50 bytes of this will be buffered. + SendApplicationData(data, 100, false); + EXPECT_EQ(stream_->BufferedDataBytes(), 50); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + EXPECT_FALSE(stream_->write_side_closed()); + EXPECT_CALL(*session_, WritevData(stream_->id(), 50, 50, _, _, _)) + .WillOnce(Return(QuicConsumedData(50, false))); + stream_->OnCanWrite(); + // Now that the stream has sent 100 bytes, the write side can be closed. + EXPECT_TRUE(stream_->write_side_closed()); + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1); + EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1); + QuicByteCount newly_acked_length = 0; + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); +} + +TEST_P(QuicStreamTest, ReliableSizeIsFinOffset) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + EXPECT_CALL(*session_, WritevData(_, 100, 0, FIN, _, _)) + .WillOnce(Return(QuicConsumedData(100, true))); + char data[100]; + SendApplicationData(data, 100, true); + // Send STOP_SENDING, but nothing else. + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + EXPECT_CALL(*session_, MaybeSendRstStreamFrame(_, _, _)).Times(0); + EXPECT_TRUE(stream_->SetReliableSize()); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + // Lose the packet; the stream will not be FINed again. + stream_->OnStreamFrameLost(0, 100, true); + EXPECT_CALL(*session_, + WritevData(stream_->id(), 100, 0, NO_FIN, LOSS_RETRANSMISSION, _)) + .WillOnce(Return(QuicConsumedData(100, true))); + stream_->OnCanWrite(); +} + +TEST_P(QuicStreamTest, DataAfterResetStreamAt) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0); + EXPECT_QUIC_BUG(SendApplicationData(data, 100, false), + "Fin already buffered or RESET_STREAM_AT sent"); + EXPECT_EQ(stream_->stream_bytes_written(), 100); +} + +TEST_P(QuicStreamTest, SetReliableSizeOnUnidirectionalRead) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId( + connection_->transport_version(), Perspective::IS_CLIENT); + TestStream stream(stream_id, session_.get(), READ_UNIDIRECTIONAL); + EXPECT_FALSE(stream.SetReliableSize()); +} + +// This covers the case where the read side is already closed, that the zombie +// stream is cleaned up. +TEST_P(QuicStreamTest, ResetStreamAtUnidirectionalWrite) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + const QuicStreamId kId = 3; + std::unique_ptr<TestStream> stream = + std::make_unique<TestStream>(kId, session_.get(), WRITE_UNIDIRECTIONAL); + TestStream* stream_ptr = stream.get(); + session_->ActivateStream(std::move(stream)); + char data[100]; + EXPECT_CALL(*session_, WritevData(kId, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(stream_ptr, data, 100, false); + EXPECT_TRUE(stream_ptr->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_ptr->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + EXPECT_CALL(*stream_ptr, OnWriteSideInDataRecvdState()); + EXPECT_CALL(*connection_, OnStreamReset(kId, _)).Times(1); + ; + QuicByteCount newly_acked_length = 0; + stream_ptr->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + ASSERT_EQ(closed_streams->size(), 1); + EXPECT_EQ((*(closed_streams->begin()))->id(), kId); +} + +// This covers the case where the read side is already closed with FIN, that the +// zombie stream is cleaned up. +TEST_P(QuicStreamTest, ResetStreamAtReadSideFin) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + // Fin the read side. + QuicStreamId stream_id = stream_->id(); + EXPECT_CALL(*stream_, OnDataAvailable()).Times(1); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 0, "")); + stream_->OnFinRead(); + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + SendApplicationData(data, 100, false); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->PartialResetWriteSide( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); + EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()); + EXPECT_CALL(*connection_, OnStreamReset(stream_id, _)).Times(1); + QuicByteCount newly_acked_length = 0; + stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), &newly_acked_length); + std::vector<std::unique_ptr<QuicStream>>* closed_streams = + session_->ClosedStreams(); + ASSERT_EQ(closed_streams->size(), 1); + EXPECT_EQ((*(closed_streams->begin()))->id(), stream_id); +} + +TEST_P(QuicStreamTest, ResetStreamAtAfterStopSending) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr); + EXPECT_TRUE(stream_->SetReliableSize()); + EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1); + stream_->OnStopSending( + QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED)); +} + +TEST_P(QuicStreamTest, RejectReliableSizeOldVersion) { + Initialize(); + if (VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + char data[100]; + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)) + .WillOnce(Return(QuicConsumedData(100, false))); + stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr); + EXPECT_FALSE(stream_->SetReliableSize()); +} + +TEST_P(QuicStreamTest, RejectReliableSizeReadOnlyStream) { + Initialize(); + if (!VersionHasIetfQuicFrames(session_->transport_version())) { + return; + } + auto uni = new StrictMock<TestStream>(6, session_.get(), READ_UNIDIRECTIONAL); + session_->ActivateStream(absl::WrapUnique(uni)); + EXPECT_FALSE(uni->SetReliableSize()); +} + } // namespace } // namespace test } // namespace quic
diff --git a/quiche/quic/test_tools/quic_test_utils.h b/quiche/quic/test_tools/quic_test_utils.h index 90d9be4..10f9fb1 100644 --- a/quiche/quic/test_tools/quic_test_utils.h +++ b/quiche/quic/test_tools/quic_test_utils.h
@@ -36,6 +36,7 @@ #include "quiche/quic/core/quic_path_validator.h" #include "quiche/quic/core/quic_sent_packet_manager.h" #include "quiche/quic/core/quic_server_id.h" +#include "quiche/quic/core/quic_session.h" #include "quiche/quic/core/quic_time.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/quic_utils.h" @@ -826,6 +827,10 @@ (QuicStreamId stream_id, QuicResetStreamError error, QuicStreamOffset bytes_written), (override)); + MOCK_METHOD(void, MaybeSendResetStreamAtFrame, + (QuicStreamId stream_id, QuicResetStreamError error, + QuicStreamOffset bytes_written, QuicStreamOffset reliable_size), + (override)); MOCK_METHOD(void, MaybeSendStopSendingFrame, (QuicStreamId stream_id, QuicResetStreamError error), (override)); MOCK_METHOD(void, SendBlocked, @@ -853,6 +858,8 @@ id, QuicResetStreamError::FromInternal(error), bytes_written); } + ClosedStreams* ClosedStreams() { return QuicSession::closed_streams(); } + private: std::unique_ptr<QuicCryptoStream> crypto_stream_; };
diff --git a/quiche/quic/test_tools/simple_quic_framer.cc b/quiche/quic/test_tools/simple_quic_framer.cc index 13b21f6..8ad6656 100644 --- a/quiche/quic/test_tools/simple_quic_framer.cc +++ b/quiche/quic/test_tools/simple_quic_framer.cc
@@ -358,17 +358,23 @@ SimpleQuicFramer::SimpleQuicFramer() : framer_(AllSupportedVersions(), QuicTime::Zero(), Perspective::IS_SERVER, - kQuicDefaultConnectionIdLength) {} + kQuicDefaultConnectionIdLength) { + framer_.set_process_reset_stream_at(true); +} SimpleQuicFramer::SimpleQuicFramer( const ParsedQuicVersionVector& supported_versions) : framer_(supported_versions, QuicTime::Zero(), Perspective::IS_SERVER, - kQuicDefaultConnectionIdLength) {} + kQuicDefaultConnectionIdLength) { + framer_.set_process_reset_stream_at(true); +} SimpleQuicFramer::SimpleQuicFramer( const ParsedQuicVersionVector& supported_versions, Perspective perspective) : framer_(supported_versions, QuicTime::Zero(), perspective, - kQuicDefaultConnectionIdLength) {} + kQuicDefaultConnectionIdLength) { + framer_.set_process_reset_stream_at(true); +} SimpleQuicFramer::~SimpleQuicFramer() {}