Send-side RESET_STREAM_AT frame. Nothing utilizes the API yet, so not protected.
This API is documented at go/reset-stream-at.
The application calls SetReliableSize() at will, which means that all data in the send buffer at that point will be delivered reliably, even if it is later reset.
If reliable_size is set, then an incoming STOP_SENDING will result in RESET_STREAM_AT.
When the application calls PartialResetWriteSide(), send a RESET_STREAM_AT with the indicated reliable_size. Accept no more data from the application, and notionally acknowledge any sent data beyond reliable_size.
(a) if reliable_size has been sent, the write side is closed, and if the read side is closed, the stream will when reliable_size is acked.
(b) if reliable_size has been buffered, the write side will not close until the buffered data up to reliable_size has been sent.
PiperOrigin-RevId: 704756508
diff --git a/quiche/quic/core/quic_session.cc b/quiche/quic/core/quic_session.cc
index b91dbbd..a804c47 100644
--- a/quiche/quic/core/quic_session.cc
+++ b/quiche/quic/core/quic_session.cc
@@ -1089,6 +1089,18 @@
connection_->OnStreamReset(id, error.internal_code());
}
+void QuicSession::MaybeSendResetStreamAtFrame(QuicStreamId id,
+ QuicResetStreamError error,
+ QuicStreamOffset bytes_written,
+ QuicStreamOffset reliable_size) {
+ QUICHE_DCHECK(connection()->reliable_stream_reset_enabled());
+ if (!connection()->connected()) {
+ return;
+ }
+ control_frame_manager_.WriteOrBufferResetStreamAt(id, error, bytes_written,
+ reliable_size);
+}
+
void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
QuicResetStreamError error) {
if (!connection()->connected()) {
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h
index b15dbac..23537f0 100644
--- a/quiche/quic/core/quic_session.h
+++ b/quiche/quic/core/quic_session.h
@@ -618,6 +618,12 @@
virtual void MaybeSendRstStreamFrame(QuicStreamId id,
QuicResetStreamError error,
QuicStreamOffset bytes_written);
+ // Does actual work of sending RESET_STREAM_AT, if the stream type allows.
+ // Also informs the connection so that pending stream frames can be flushed.
+ virtual void MaybeSendResetStreamAtFrame(QuicStreamId id,
+ QuicResetStreamError error,
+ QuicStreamOffset bytes_written,
+ QuicStreamOffset reliable_size);
// Sends a STOP_SENDING frame if the stream type allows.
virtual void MaybeSendStopSendingFrame(QuicStreamId id,
diff --git a/quiche/quic/core/quic_session_test.cc b/quiche/quic/core/quic_session_test.cc
index 25008ae..452d962 100644
--- a/quiche/quic/core/quic_session_test.cc
+++ b/quiche/quic/core/quic_session_test.cc
@@ -434,6 +434,13 @@
return num_incoming_streams_created_;
}
+ void EnableReliableStreamReset() {
+ QuicConfig* quic_config = config();
+ ASSERT_TRUE(quic_config != nullptr);
+ quic_config->SetReliableStreamReset(true);
+ connection()->SetFromConfig(*quic_config);
+ }
+
using QuicSession::ActivateStream;
using QuicSession::CanOpenNextOutgoingBidirectionalStream;
using QuicSession::CanOpenNextOutgoingUnidirectionalStream;
@@ -3283,6 +3290,39 @@
session_.ResetStream(bidirectional, QUIC_STREAM_CANCELLED);
}
+TEST_P(QuicSessionTestServer, AcceptReliableSizeIfNegotiated) {
+ CompleteHandshake();
+ if (!VersionHasIetfQuicFrames(transport_version())) {
+ return;
+ }
+ session_.EnableReliableStreamReset();
+ MockPacketWriter* writer = static_cast<MockPacketWriter*>(
+ QuicConnectionPeer::GetWriter(session_.connection()));
+ TestStream* write_only = session_.CreateOutgoingUnidirectionalStream();
+ EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _))
+ .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
+ session_.SendStreamData(write_only);
+ EXPECT_FALSE(write_only->fin_sent());
+ EXPECT_TRUE(write_only->SetReliableSize());
+}
+
+TEST_P(QuicSessionTestServer, RejectReliableSizeNotNegotiated) {
+ if (!VersionHasIetfQuicFrames(transport_version())) {
+ return;
+ }
+ CompleteHandshake();
+ ASSERT_FALSE(session_.connection()->reliable_stream_reset_enabled());
+ TestStream* bidirectional =
+ session_.CreateIncomingStream(GetNthClientInitiatedBidirectionalId(0));
+ MockPacketWriter* writer = static_cast<MockPacketWriter*>(
+ QuicConnectionPeer::GetWriter(session_.connection()));
+ EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _))
+ .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
+ session_.SendStreamData(bidirectional);
+ EXPECT_FALSE(bidirectional->fin_sent());
+ EXPECT_FALSE(bidirectional->SetReliableSize());
+}
+
TEST_P(QuicSessionTestServer, DecryptionKeyAvailableBeforeEncryptionKey) {
if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
return;
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index 7f10791..e65e0d7 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -379,6 +379,7 @@
fin_lost_(false),
fin_received_(fin_received),
rst_sent_(false),
+ rst_stream_at_sent_(false),
rst_received_(false),
stop_sending_sent_(false),
flow_controller_(std::move(flow_controller)),
@@ -400,7 +401,8 @@
: type),
creation_time_(session->connection()->clock()->ApproximateNow()),
pending_duration_(pending_duration),
- perspective_(session->perspective()) {
+ perspective_(session->perspective()),
+ reliable_size_(0) {
if (type_ == WRITE_UNIDIRECTIONAL) {
fin_received_ = true;
CloseReadSide();
@@ -531,7 +533,17 @@
}
stream_error_ = error;
- MaybeSendRstStream(error);
+ if (reliable_size_ == 0) {
+ MaybeSendRstStream(error);
+ } else {
+ // The spec is ambiguous as to whether a RESET_STREAM or RESET_STREAM_AT
+ // should be sent in response to a STOP_SENDING frame if the write side has
+ // specified a reliable size. Because STOP_SENDING and RESET_STREAM_AT could
+ // cross in flight, send RESET_STREAM_AT if reliable_size is set, so that
+ // the result of setting reliable_size is consistent. ResetWriteSide() will
+ // check reliable_size and do the right thing.
+ PartialResetWriteSide(error);
+ }
if (session()->enable_stop_sending_for_zombie_streams() &&
read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
QUIC_RELOADABLE_FLAG_COUNT_N(quic_deliver_stop_sending_to_zombie_streams, 3,
@@ -660,8 +672,24 @@
ResetWithError(QuicResetStreamError::FromInternal(error));
}
+bool QuicStream::SetReliableSize() {
+ if (rst_sent_ || rst_stream_at_sent_) {
+ return false;
+ }
+ if (!session_->connection()->reliable_stream_reset_enabled() ||
+ !VersionHasIetfQuicFrames(transport_version()) ||
+ type_ == READ_UNIDIRECTIONAL) {
+ return false;
+ }
+ reliable_size_ = send_buffer_.stream_offset();
+ return true;
+}
+
void QuicStream::ResetWithError(QuicResetStreamError error) {
stream_error_ = error;
+ // The caller has explicitly abandoned reliable delivery of anything in the
+ // stream, so adjust stream state accordingly.
+ reliable_size_ = 0;
QuicConnection::ScopedPacketFlusher flusher(session()->connection());
MaybeSendStopSending(error);
MaybeSendRstStream(error);
@@ -680,6 +708,42 @@
}
}
+void QuicStream::PartialResetWriteSide(QuicResetStreamError error) {
+ if (reliable_size_ == 0) {
+ QUIC_BUG(quic_bug_reliable_size_not_set)
+ << "QuicStream::PartialResetWriteSide called when reliable_size_ is 0";
+ return;
+ }
+ if (rst_sent_) {
+ QUIC_BUG(quic_bug_reset_stream_at_after_rst_sent)
+ << "QuicStream::PartialResetWriteSide on reset stream";
+ return;
+ }
+ stream_error_ = error;
+ MaybeSendResetStreamAt(error);
+ if (reliable_size_ <= stream_bytes_written()) {
+ // Notionally ack unreliable, previously consumed data so that it's not
+ // retransmitted, and the buffer can free the memory.
+ QuicByteCount newly_acked;
+ send_buffer_.OnStreamDataAcked(
+ reliable_size_, stream_bytes_written() - reliable_size_, &newly_acked);
+ fin_outstanding_ = false; // Do not wait to close until FIN is acked.
+ fin_lost_ = false;
+ if (!IsWaitingForAcks()) {
+ session_->connection()->OnStreamReset(id_, stream_error_.internal_code());
+ }
+ CloseWriteSide();
+ } else {
+ // If stream_bytes_written() < reliable_size_, then the write side can't
+ // close until buffered data is sent.
+ QUIC_BUG_IF(quic_bug_unexpected_write_side_closed, write_side_closed_)
+ << "Write side closed with unsent reliable data";
+ }
+ if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+ session()->MaybeCloseZombieStream(id_);
+ }
+}
+
void QuicStream::SendStopSending(QuicResetStreamError error) {
stream_error_ = error;
MaybeSendStopSending(error);
@@ -733,8 +797,9 @@
return;
}
- if (fin_buffered_) {
- QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
+ if (fin_buffered_ || rst_stream_at_sent_) {
+ QUIC_BUG(quic_bug_10586_3)
+ << "Fin already buffered, or RESET_STREAM_AT sent";
return;
}
if (write_side_closed_) {
@@ -791,7 +856,8 @@
if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
}
- if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
+ if (!fin_buffered_ && !fin_sent_ && !rst_stream_at_sent_ &&
+ CanWriteNewData()) {
// Notify upper layer to write new data when buffered data size is below
// low water mark.
OnCanWriteNewData();
@@ -834,8 +900,9 @@
return consumed_data;
}
- if (fin_buffered_) {
- QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
+ if (fin_buffered_ || rst_stream_at_sent_) {
+ QUIC_BUG(quic_bug_10586_7)
+ << "Fin already buffered or RESET_STREAM_AT sent";
return consumed_data;
}
@@ -940,6 +1007,8 @@
}
void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
+ // It is OK to send RESET_STREAM after RESET_STREAM_AT. reliable_size can
+ // always decrease in the spec, so it doesn't check rst_stream_at_sent_.
if (rst_sent_) {
return;
}
@@ -954,9 +1023,31 @@
CloseWriteSide();
}
+void QuicStream::MaybeSendResetStreamAt(QuicResetStreamError error) {
+ if (!session_->connection()->reliable_stream_reset_enabled() ||
+ !VersionHasIetfQuicFrames(transport_version())) {
+ QUIC_BUG_IF(quic_bug_gquic_calling_reset_stream_at,
+ !VersionHasIetfQuicFrames(transport_version()))
+ << "gQUIC is calling MaybeSendResetStreamAt";
+ MaybeSendRstStream(error);
+ return;
+ }
+ if (rst_sent_ || rst_stream_at_sent_) {
+ return;
+ }
+ // If data has been buffered but not sent, it doesn't normally count towards
+ // final_size. However, if that buffered data is within reliable_size_, it
+ // will have to be sent, and therefore needs to be included in final_size.
+ QuicByteCount final_size = std::max(stream_bytes_written(), reliable_size_);
+ session()->MaybeSendResetStreamAtFrame(id(), error, final_size,
+ reliable_size_);
+ rst_stream_at_sent_ = true;
+}
+
bool QuicStream::HasBufferedData() const {
QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
- return send_buffer_.stream_offset() > stream_bytes_written();
+ return (send_buffer_.stream_offset() > stream_bytes_written() &&
+ (!rst_stream_at_sent_ || reliable_size_ > stream_bytes_written()));
}
ParsedQuicVersion QuicStream::version() const { return session_->version(); }
@@ -977,11 +1068,11 @@
void QuicStream::OnClose() {
QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
- if (!fin_sent_ && !rst_sent_) {
+ if (!fin_sent_ && !rst_sent_ && !rst_stream_at_sent_) {
QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
session()->version().UsesHttp3())
- << "The stream should've already sent RST in response to "
- "STOP_SENDING";
+ << "The stream should've already sent RESET_STREAM or RESET_STREAM_AT "
+ "in response to STOP_SENDING";
// For flow control accounting, tell the peer how many bytes have been
// written on this stream before termination. Done here if needed, using a
// RST_STREAM frame.
@@ -1169,6 +1260,9 @@
!write_side_data_recvd_state_notified_) {
OnWriteSideInDataRecvdState();
write_side_data_recvd_state_notified_ = true;
+ if (rst_stream_at_sent_) {
+ session_->connection()->OnStreamReset(id_, stream_error_.internal_code());
+ }
}
if (notify_ack_listener_earlier_ && new_data_acked) {
QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3);
@@ -1291,6 +1385,20 @@
// Size of buffered data.
QuicByteCount write_length = BufferedDataBytes();
+ // Do not send data beyond reliable_size_.
+ // TODO(martinduke): This code could be simpler if partial reset directly
+ // deleted data from the buffer, instead of notionally acking it. Since unsent
+ // data can't be acked, it's still in the buffer and has to be explicitly not
+ // sent.
+ if (rst_stream_at_sent_ &&
+ stream_bytes_written() + write_length > reliable_size_) {
+ if (reliable_size_ <= stream_bytes_written()) {
+ QUIC_BUG(quic_bug_reliable_size_already_sent)
+ << "Call to WriteBufferedData after reliable_size_ has been sent.";
+ return;
+ }
+ write_length = reliable_size_ - stream_bytes_written();
+ }
// A FIN with zero data payload should not be flow control blocked.
bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
@@ -1366,6 +1474,11 @@
if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
busy_counter_ = 0;
}
+ if (rst_stream_at_sent_ && stream_bytes_written() >= reliable_size_) {
+ // If data up to reliable_size_ has been sent, the write side can finally
+ // close.
+ CloseWriteSide();
+ }
}
uint64_t QuicStream::BufferedDataBytes() const {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index f4a9f58..5604e66 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -181,9 +181,21 @@
// interface.
void Reset(QuicRstStreamErrorCode error);
- // Reset() sends both RESET_STREAM and STOP_SENDING; the two methods below
- // allow to send only one of those.
+ // Record the current offset as the reliable size to be delivered if a partial
+ // reset is called. Returns false if a RST_STREAM or RESET_STREAM_AT has
+ // already been sent, the stream is receive-only, or the connection does not
+ // support RESET_STREAM_AT.
+ bool SetReliableSize();
+
+ // Send a RESET_STREAM_AT with a reliable size that had earlier been set by
+ // SetReliableSize(). Does not send STOP_SENDING and does not close the read
+ // side. Will trigger QUIC_BUG if reliable_size_ is zero.
+ void PartialResetWriteSide(QuicResetStreamError error);
+ // TODO(rch): Delete this function once Envoy has migrated to
+ // PartialResetWriteSide.
void ResetWriteSide(QuicResetStreamError error);
+ // Reset() sends both RESET_STREAM and STOP_SENDING; this allows the caller to
+ // send only STOP_SENDING.
void SendStopSending(QuicResetStreamError error);
// Called by the subclass or the sequencer to close the entire connection from
@@ -470,6 +482,8 @@
// Send RESET_STREAM if it hasn't been sent yet.
void MaybeSendRstStream(QuicResetStreamError error);
+ // Send RESET_STREAM_AT if neither it nor RESET_STREAM has been sent yet.
+ void MaybeSendResetStreamAt(QuicResetStreamError error);
// Convenience wrappers for two methods above.
void MaybeSendRstStream(QuicRstStreamErrorCode error) {
@@ -596,10 +610,11 @@
// StreamFrame with the FIN set.
bool fin_received_;
- // True if an RST_STREAM has been sent to the session.
- // In combination with fin_sent_, used to ensure that a FIN and/or a
- // RST_STREAM is always sent to terminate the stream.
+ // True if an RST_STREAM or RESET_STREAM_AT has been sent to the session.
+ // In combination with fin_sent_, used to ensure that a FIN, RST_STREAM, or
+ // RESET_STREAM_AT is always sent to terminate the stream.
bool rst_sent_;
+ bool rst_stream_at_sent_;
// True if this stream has received a RST_STREAM frame.
bool rst_received_;
@@ -660,6 +675,10 @@
const bool notify_ack_listener_earlier_ =
GetQuicReloadableFlag(quic_notify_ack_listener_earlier);
+
+ // If the stream is reset, outgoing data up to reliable_size_will be
+ // delivered (and acknowledged) before the write side of the stream is closed.
+ QuicStreamOffset reliable_size_;
};
} // namespace quic
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc
index d738fd6..c7bd13d 100644
--- a/quiche/quic/core/quic_stream_test.cc
+++ b/quiche/quic/core/quic_stream_test.cc
@@ -4,6 +4,7 @@
#include "quiche/quic/core/quic_stream.h"
+#include <cmath>
#include <cstddef>
#include <memory>
#include <optional>
@@ -21,6 +22,7 @@
#include "quiche/quic/core/quic_connection.h"
#include "quiche/quic/core/quic_constants.h"
#include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_stream_sequencer.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/core/quic_utils.h"
#include "quiche/quic/core/quic_versions.h"
@@ -91,6 +93,8 @@
ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1));
}
+ QuicStreamSequencer* sequencer() { return QuicStream::sequencer(); }
+
private:
std::string data_;
};
@@ -121,6 +125,7 @@
QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesOutgoingBidirectional(
session_->config(), kMinimumFlowControlSendWindow);
QuicConfigPeer::SetReceivedMaxUnidirectionalStreams(session_->config(), 10);
+ session_->config()->SetReliableStreamReset(true);
session_->OnConfigNegotiated();
stream_ = new StrictMock<TestStream>(kTestStreamId, session_.get(),
@@ -167,6 +172,23 @@
return true;
}
+ // Use application stream interface for sending data. This will trigger a call
+ // to mock_stream->Writev(_, _) that will have to return QuicConsumedData.
+ QuicConsumedData SendApplicationData(TestStream* stream,
+ absl::string_view data, size_t iov_len,
+ bool fin) {
+ struct iovec iov = {const_cast<char*>(data.data()), iov_len};
+ quiche::QuicheMemSliceStorage storage(
+ &iov, 1,
+ session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
+ return stream->WriteMemSlices(storage.ToSpan(), fin);
+ }
+
+ QuicConsumedData SendApplicationData(absl::string_view data, size_t iov_len,
+ bool fin) {
+ return SendApplicationData(stream_, data, iov_len, fin);
+ }
+
protected:
MockQuicConnectionHelper helper_;
MockAlarmFactory alarm_factory_;
@@ -1222,11 +1244,7 @@
// Testing Writev.
EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
.WillOnce(Return(QuicConsumedData(0, false)));
- struct iovec iov = {const_cast<char*>(data.data()), data.length()};
- quiche::QuicheMemSliceStorage storage(
- &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
- 1024);
- QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
+ QuicConsumedData consumed = SendApplicationData(data, data.length(), false);
// There is no buffered data before, all data should be consumed without
// respecting buffered data upper limit.
@@ -1236,10 +1254,8 @@
EXPECT_FALSE(stream_->CanWriteNewData());
EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
- quiche::QuicheMemSliceStorage storage2(
- &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
- 1024);
- consumed = stream_->WriteMemSlices(storage2.ToSpan(), false);
+ consumed = SendApplicationData(data, data.length(), false);
+
// No Data can be consumed as buffered data is beyond upper limit.
EXPECT_EQ(0u, consumed.bytes_consumed);
EXPECT_FALSE(consumed.fin_consumed);
@@ -1261,10 +1277,7 @@
EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
// All data can be consumed as buffered data is below upper limit.
- quiche::QuicheMemSliceStorage storage3(
- &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
- 1024);
- consumed = stream_->WriteMemSlices(storage3.ToSpan(), false);
+ consumed = SendApplicationData(data, data.length(), false);
EXPECT_EQ(data.length(), consumed.bytes_consumed);
EXPECT_FALSE(consumed.fin_consumed);
EXPECT_EQ(data.length() + GetQuicFlag(quic_buffered_data_threshold) - 1,
@@ -1279,21 +1292,13 @@
stream_);
EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
.WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
- struct iovec iov = {const_cast<char*>(data.data()), 5u};
- quiche::QuicheMemSliceStorage storage(
- &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
- 1024);
- QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
+ QuicConsumedData consumed = SendApplicationData(data, 5, false);
EXPECT_EQ(data.length(), consumed.bytes_consumed);
- struct iovec iov2 = {const_cast<char*>(data.data()), 1u};
- quiche::QuicheMemSliceStorage storage2(
- &iov2, 1,
- session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
EXPECT_QUIC_BUG(
{
EXPECT_CALL(*connection_,
CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
- stream_->WriteMemSlices(storage2.ToSpan(), false);
+ SendApplicationData(data, 1, false);
},
"Write too many data via stream");
}
@@ -1942,6 +1947,353 @@
QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100)));
}
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfReset) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ QuicByteCount newly_acked_length = 0;
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ EXPECT_TRUE(closed_streams->empty());
+ // Peer sends RST_STREAM in response.
+ QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+ QUIC_STREAM_CANCELLED, 1234);
+ stream_->OnStreamReset(rst_frame);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+ ASSERT_EQ(closed_streams->size(), 1);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetAndRetransmitted) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ // Send 50 more bytes that aren't reliable.
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(50, false)));
+ SendApplicationData(data, 50, false);
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+
+ // Lose all the bytes.
+ stream_->OnStreamFrameLost(0, 150, false);
+ // Cause retransmission of the reliable bytes.
+ EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ stream_->OnCanWrite();
+
+ // Ack the reliable bytes, and close.
+ QuicByteCount newly_acked_length = 0;
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ EXPECT_TRUE(closed_streams->empty());
+ // Peer sends RST_STREAM in response.
+ QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+ QUIC_STREAM_CANCELLED, 1234);
+ stream_->OnStreamReset(rst_frame);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+ ASSERT_EQ(closed_streams->size(), 1);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideReset) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+
+ // Peer sends RST_STREAM in response.
+ QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+ QUIC_STREAM_CANCELLED, 1234);
+ stream_->OnStreamReset(rst_frame);
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ ASSERT_TRUE(closed_streams->empty());
+ QuicByteCount newly_acked_length = 0;
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ ASSERT_EQ(closed_streams->size(), 1);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideFin) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ EXPECT_TRUE(stream_->write_side_closed());
+
+ // Peer sends OOO FIN.
+ stream_->OnStreamFrame(
+ QuicStreamFrame(stream_->id(), true, sizeof(data), ""));
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ ASSERT_TRUE(closed_streams->empty());
+ EXPECT_FALSE(stream_->read_side_closed()); // Missing the data before 100.
+
+ QuicByteCount newly_acked_length = 0;
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ ASSERT_TRUE(closed_streams->empty());
+ // The rest of the stream arrives.
+ EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([&]() {
+ // Most classes derived from QuicStream do something like this in
+ // OnDataAvailable. This is how FIN-related state is updated.
+ std::string buffer;
+ stream_->sequencer()->Read(&buffer);
+ if (stream_->sequencer()->IsClosed()) {
+ stream_->OnFinRead();
+ }
+ });
+ stream_->OnStreamFrame(QuicStreamFrame(
+ stream_->id(), false, 0, absl::string_view(data, sizeof(data))));
+ EXPECT_TRUE(stream_->read_side_closed());
+ ASSERT_EQ(closed_streams->size(), 1);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+}
+
+TEST_P(QuicStreamTest, ReliableSizeAckedAtTimeOfReset) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ char data[100];
+ SendApplicationData(data, 100, false);
+ QuicByteCount newly_acked_length = 0;
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+}
+
+TEST_P(QuicStreamTest, BufferedDataInReliableSize) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _))
+ .WillOnce(Return(QuicConsumedData(50, false)));
+ char data[100];
+ // 50 bytes of this will be buffered.
+ SendApplicationData(data, 100, false);
+ EXPECT_EQ(stream_->BufferedDataBytes(), 50);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ EXPECT_FALSE(stream_->write_side_closed());
+ EXPECT_CALL(*session_, WritevData(stream_->id(), 50, 50, _, _, _))
+ .WillOnce(Return(QuicConsumedData(50, false)));
+ stream_->OnCanWrite();
+ // Now that the stream has sent 100 bytes, the write side can be closed.
+ EXPECT_TRUE(stream_->write_side_closed());
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+ EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+ QuicByteCount newly_acked_length = 0;
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeIsFinOffset) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ EXPECT_CALL(*session_, WritevData(_, 100, 0, FIN, _, _))
+ .WillOnce(Return(QuicConsumedData(100, true)));
+ char data[100];
+ SendApplicationData(data, 100, true);
+ // Send STOP_SENDING, but nothing else.
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ EXPECT_CALL(*session_, MaybeSendRstStreamFrame(_, _, _)).Times(0);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ // Lose the packet; the stream will not be FINed again.
+ stream_->OnStreamFrameLost(0, 100, true);
+ EXPECT_CALL(*session_,
+ WritevData(stream_->id(), 100, 0, NO_FIN, LOSS_RETRANSMISSION, _))
+ .WillOnce(Return(QuicConsumedData(100, true)));
+ stream_->OnCanWrite();
+}
+
+TEST_P(QuicStreamTest, DataAfterResetStreamAt) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
+ EXPECT_QUIC_BUG(SendApplicationData(data, 100, false),
+ "Fin already buffered or RESET_STREAM_AT sent");
+ EXPECT_EQ(stream_->stream_bytes_written(), 100);
+}
+
+TEST_P(QuicStreamTest, SetReliableSizeOnUnidirectionalRead) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+ connection_->transport_version(), Perspective::IS_CLIENT);
+ TestStream stream(stream_id, session_.get(), READ_UNIDIRECTIONAL);
+ EXPECT_FALSE(stream.SetReliableSize());
+}
+
+// This covers the case where the read side is already closed, that the zombie
+// stream is cleaned up.
+TEST_P(QuicStreamTest, ResetStreamAtUnidirectionalWrite) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ const QuicStreamId kId = 3;
+ std::unique_ptr<TestStream> stream =
+ std::make_unique<TestStream>(kId, session_.get(), WRITE_UNIDIRECTIONAL);
+ TestStream* stream_ptr = stream.get();
+ session_->ActivateStream(std::move(stream));
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(kId, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(stream_ptr, data, 100, false);
+ EXPECT_TRUE(stream_ptr->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_ptr->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ EXPECT_CALL(*stream_ptr, OnWriteSideInDataRecvdState());
+ EXPECT_CALL(*connection_, OnStreamReset(kId, _)).Times(1);
+ ;
+ QuicByteCount newly_acked_length = 0;
+ stream_ptr->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ ASSERT_EQ(closed_streams->size(), 1);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), kId);
+}
+
+// This covers the case where the read side is already closed with FIN, that the
+// zombie stream is cleaned up.
+TEST_P(QuicStreamTest, ResetStreamAtReadSideFin) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ // Fin the read side.
+ QuicStreamId stream_id = stream_->id();
+ EXPECT_CALL(*stream_, OnDataAvailable()).Times(1);
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 0, ""));
+ stream_->OnFinRead();
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ SendApplicationData(data, 100, false);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->PartialResetWriteSide(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+ EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
+ EXPECT_CALL(*connection_, OnStreamReset(stream_id, _)).Times(1);
+ QuicByteCount newly_acked_length = 0;
+ stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(), &newly_acked_length);
+ std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+ session_->ClosedStreams();
+ ASSERT_EQ(closed_streams->size(), 1);
+ EXPECT_EQ((*(closed_streams->begin()))->id(), stream_id);
+}
+
+TEST_P(QuicStreamTest, ResetStreamAtAfterStopSending) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr);
+ EXPECT_TRUE(stream_->SetReliableSize());
+ EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+ stream_->OnStopSending(
+ QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+}
+
+TEST_P(QuicStreamTest, RejectReliableSizeOldVersion) {
+ Initialize();
+ if (VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ char data[100];
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+ .WillOnce(Return(QuicConsumedData(100, false)));
+ stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr);
+ EXPECT_FALSE(stream_->SetReliableSize());
+}
+
+TEST_P(QuicStreamTest, RejectReliableSizeReadOnlyStream) {
+ Initialize();
+ if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+ return;
+ }
+ auto uni = new StrictMock<TestStream>(6, session_.get(), READ_UNIDIRECTIONAL);
+ session_->ActivateStream(absl::WrapUnique(uni));
+ EXPECT_FALSE(uni->SetReliableSize());
+}
+
} // namespace
} // namespace test
} // namespace quic
diff --git a/quiche/quic/test_tools/quic_test_utils.h b/quiche/quic/test_tools/quic_test_utils.h
index 90d9be4..10f9fb1 100644
--- a/quiche/quic/test_tools/quic_test_utils.h
+++ b/quiche/quic/test_tools/quic_test_utils.h
@@ -36,6 +36,7 @@
#include "quiche/quic/core/quic_path_validator.h"
#include "quiche/quic/core/quic_sent_packet_manager.h"
#include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/core/quic_session.h"
#include "quiche/quic/core/quic_time.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/core/quic_utils.h"
@@ -826,6 +827,10 @@
(QuicStreamId stream_id, QuicResetStreamError error,
QuicStreamOffset bytes_written),
(override));
+ MOCK_METHOD(void, MaybeSendResetStreamAtFrame,
+ (QuicStreamId stream_id, QuicResetStreamError error,
+ QuicStreamOffset bytes_written, QuicStreamOffset reliable_size),
+ (override));
MOCK_METHOD(void, MaybeSendStopSendingFrame,
(QuicStreamId stream_id, QuicResetStreamError error), (override));
MOCK_METHOD(void, SendBlocked,
@@ -853,6 +858,8 @@
id, QuicResetStreamError::FromInternal(error), bytes_written);
}
+ ClosedStreams* ClosedStreams() { return QuicSession::closed_streams(); }
+
private:
std::unique_ptr<QuicCryptoStream> crypto_stream_;
};
diff --git a/quiche/quic/test_tools/simple_quic_framer.cc b/quiche/quic/test_tools/simple_quic_framer.cc
index 13b21f6..8ad6656 100644
--- a/quiche/quic/test_tools/simple_quic_framer.cc
+++ b/quiche/quic/test_tools/simple_quic_framer.cc
@@ -358,17 +358,23 @@
SimpleQuicFramer::SimpleQuicFramer()
: framer_(AllSupportedVersions(), QuicTime::Zero(), Perspective::IS_SERVER,
- kQuicDefaultConnectionIdLength) {}
+ kQuicDefaultConnectionIdLength) {
+ framer_.set_process_reset_stream_at(true);
+}
SimpleQuicFramer::SimpleQuicFramer(
const ParsedQuicVersionVector& supported_versions)
: framer_(supported_versions, QuicTime::Zero(), Perspective::IS_SERVER,
- kQuicDefaultConnectionIdLength) {}
+ kQuicDefaultConnectionIdLength) {
+ framer_.set_process_reset_stream_at(true);
+}
SimpleQuicFramer::SimpleQuicFramer(
const ParsedQuicVersionVector& supported_versions, Perspective perspective)
: framer_(supported_versions, QuicTime::Zero(), perspective,
- kQuicDefaultConnectionIdLength) {}
+ kQuicDefaultConnectionIdLength) {
+ framer_.set_process_reset_stream_at(true);
+}
SimpleQuicFramer::~SimpleQuicFramer() {}