Send-side RESET_STREAM_AT frame. Nothing utilizes the API yet, so not protected.

This API is documented at go/reset-stream-at.

The application calls SetReliableSize() at will, which means that all data in the send buffer at that point will be delivered reliably, even if it is later reset.

If reliable_size is set, then an incoming STOP_SENDING will result in RESET_STREAM_AT.

When the application calls PartialResetWriteSide(), send a RESET_STREAM_AT with the indicated reliable_size. Accept no more data from the application, and notionally acknowledge any sent data beyond reliable_size.
   (a) if reliable_size has been sent, the write side is closed, and if the read side is closed, the stream will when reliable_size is acked.
   (b) if reliable_size has been buffered, the write side will not close until the buffered data up to reliable_size has been sent.

PiperOrigin-RevId: 704756508
diff --git a/quiche/quic/core/quic_session.cc b/quiche/quic/core/quic_session.cc
index b91dbbd..a804c47 100644
--- a/quiche/quic/core/quic_session.cc
+++ b/quiche/quic/core/quic_session.cc
@@ -1089,6 +1089,18 @@
   connection_->OnStreamReset(id, error.internal_code());
 }
 
+void QuicSession::MaybeSendResetStreamAtFrame(QuicStreamId id,
+                                              QuicResetStreamError error,
+                                              QuicStreamOffset bytes_written,
+                                              QuicStreamOffset reliable_size) {
+  QUICHE_DCHECK(connection()->reliable_stream_reset_enabled());
+  if (!connection()->connected()) {
+    return;
+  }
+  control_frame_manager_.WriteOrBufferResetStreamAt(id, error, bytes_written,
+                                                    reliable_size);
+}
+
 void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
                                             QuicResetStreamError error) {
   if (!connection()->connected()) {
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h
index b15dbac..23537f0 100644
--- a/quiche/quic/core/quic_session.h
+++ b/quiche/quic/core/quic_session.h
@@ -618,6 +618,12 @@
   virtual void MaybeSendRstStreamFrame(QuicStreamId id,
                                        QuicResetStreamError error,
                                        QuicStreamOffset bytes_written);
+  // Does actual work of sending RESET_STREAM_AT, if the stream type allows.
+  // Also informs the connection so that pending stream frames can be flushed.
+  virtual void MaybeSendResetStreamAtFrame(QuicStreamId id,
+                                           QuicResetStreamError error,
+                                           QuicStreamOffset bytes_written,
+                                           QuicStreamOffset reliable_size);
 
   // Sends a STOP_SENDING frame if the stream type allows.
   virtual void MaybeSendStopSendingFrame(QuicStreamId id,
diff --git a/quiche/quic/core/quic_session_test.cc b/quiche/quic/core/quic_session_test.cc
index 25008ae..452d962 100644
--- a/quiche/quic/core/quic_session_test.cc
+++ b/quiche/quic/core/quic_session_test.cc
@@ -434,6 +434,13 @@
     return num_incoming_streams_created_;
   }
 
+  void EnableReliableStreamReset() {
+    QuicConfig* quic_config = config();
+    ASSERT_TRUE(quic_config != nullptr);
+    quic_config->SetReliableStreamReset(true);
+    connection()->SetFromConfig(*quic_config);
+  }
+
   using QuicSession::ActivateStream;
   using QuicSession::CanOpenNextOutgoingBidirectionalStream;
   using QuicSession::CanOpenNextOutgoingUnidirectionalStream;
@@ -3283,6 +3290,39 @@
   session_.ResetStream(bidirectional, QUIC_STREAM_CANCELLED);
 }
 
+TEST_P(QuicSessionTestServer, AcceptReliableSizeIfNegotiated) {
+  CompleteHandshake();
+  if (!VersionHasIetfQuicFrames(transport_version())) {
+    return;
+  }
+  session_.EnableReliableStreamReset();
+  MockPacketWriter* writer = static_cast<MockPacketWriter*>(
+      QuicConnectionPeer::GetWriter(session_.connection()));
+  TestStream* write_only = session_.CreateOutgoingUnidirectionalStream();
+  EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _))
+      .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
+  session_.SendStreamData(write_only);
+  EXPECT_FALSE(write_only->fin_sent());
+  EXPECT_TRUE(write_only->SetReliableSize());
+}
+
+TEST_P(QuicSessionTestServer, RejectReliableSizeNotNegotiated) {
+  if (!VersionHasIetfQuicFrames(transport_version())) {
+    return;
+  }
+  CompleteHandshake();
+  ASSERT_FALSE(session_.connection()->reliable_stream_reset_enabled());
+  TestStream* bidirectional =
+      session_.CreateIncomingStream(GetNthClientInitiatedBidirectionalId(0));
+  MockPacketWriter* writer = static_cast<MockPacketWriter*>(
+      QuicConnectionPeer::GetWriter(session_.connection()));
+  EXPECT_CALL(*writer, WritePacket(_, _, _, _, _, _))
+      .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
+  session_.SendStreamData(bidirectional);
+  EXPECT_FALSE(bidirectional->fin_sent());
+  EXPECT_FALSE(bidirectional->SetReliableSize());
+}
+
 TEST_P(QuicSessionTestServer, DecryptionKeyAvailableBeforeEncryptionKey) {
   if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
     return;
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index 7f10791..e65e0d7 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -379,6 +379,7 @@
       fin_lost_(false),
       fin_received_(fin_received),
       rst_sent_(false),
+      rst_stream_at_sent_(false),
       rst_received_(false),
       stop_sending_sent_(false),
       flow_controller_(std::move(flow_controller)),
@@ -400,7 +401,8 @@
                 : type),
       creation_time_(session->connection()->clock()->ApproximateNow()),
       pending_duration_(pending_duration),
-      perspective_(session->perspective()) {
+      perspective_(session->perspective()),
+      reliable_size_(0) {
   if (type_ == WRITE_UNIDIRECTIONAL) {
     fin_received_ = true;
     CloseReadSide();
@@ -531,7 +533,17 @@
   }
 
   stream_error_ = error;
-  MaybeSendRstStream(error);
+  if (reliable_size_ == 0) {
+    MaybeSendRstStream(error);
+  } else {
+    // The spec is ambiguous as to whether a RESET_STREAM or RESET_STREAM_AT
+    // should be sent in response to a STOP_SENDING frame if the write side has
+    // specified a reliable size. Because STOP_SENDING and RESET_STREAM_AT could
+    // cross in flight, send RESET_STREAM_AT if reliable_size is set, so that
+    // the result of setting reliable_size is consistent. ResetWriteSide() will
+    // check reliable_size and do the right thing.
+    PartialResetWriteSide(error);
+  }
   if (session()->enable_stop_sending_for_zombie_streams() &&
       read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
     QUIC_RELOADABLE_FLAG_COUNT_N(quic_deliver_stop_sending_to_zombie_streams, 3,
@@ -660,8 +672,24 @@
   ResetWithError(QuicResetStreamError::FromInternal(error));
 }
 
+bool QuicStream::SetReliableSize() {
+  if (rst_sent_ || rst_stream_at_sent_) {
+    return false;
+  }
+  if (!session_->connection()->reliable_stream_reset_enabled() ||
+      !VersionHasIetfQuicFrames(transport_version()) ||
+      type_ == READ_UNIDIRECTIONAL) {
+    return false;
+  }
+  reliable_size_ = send_buffer_.stream_offset();
+  return true;
+}
+
 void QuicStream::ResetWithError(QuicResetStreamError error) {
   stream_error_ = error;
+  // The caller has explicitly abandoned reliable delivery of anything in the
+  // stream, so adjust stream state accordingly.
+  reliable_size_ = 0;
   QuicConnection::ScopedPacketFlusher flusher(session()->connection());
   MaybeSendStopSending(error);
   MaybeSendRstStream(error);
@@ -680,6 +708,42 @@
   }
 }
 
+void QuicStream::PartialResetWriteSide(QuicResetStreamError error) {
+  if (reliable_size_ == 0) {
+    QUIC_BUG(quic_bug_reliable_size_not_set)
+        << "QuicStream::PartialResetWriteSide called when reliable_size_ is 0";
+    return;
+  }
+  if (rst_sent_) {
+    QUIC_BUG(quic_bug_reset_stream_at_after_rst_sent)
+        << "QuicStream::PartialResetWriteSide on reset stream";
+    return;
+  }
+  stream_error_ = error;
+  MaybeSendResetStreamAt(error);
+  if (reliable_size_ <= stream_bytes_written()) {
+    // Notionally ack unreliable, previously consumed data so that it's not
+    // retransmitted, and the buffer can free the memory.
+    QuicByteCount newly_acked;
+    send_buffer_.OnStreamDataAcked(
+        reliable_size_, stream_bytes_written() - reliable_size_, &newly_acked);
+    fin_outstanding_ = false;  // Do not wait to close until FIN is acked.
+    fin_lost_ = false;
+    if (!IsWaitingForAcks()) {
+      session_->connection()->OnStreamReset(id_, stream_error_.internal_code());
+    }
+    CloseWriteSide();
+  } else {
+    // If stream_bytes_written() < reliable_size_, then the write side can't
+    // close until buffered data is sent.
+    QUIC_BUG_IF(quic_bug_unexpected_write_side_closed, write_side_closed_)
+        << "Write side closed with unsent reliable data";
+  }
+  if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
+    session()->MaybeCloseZombieStream(id_);
+  }
+}
+
 void QuicStream::SendStopSending(QuicResetStreamError error) {
   stream_error_ = error;
   MaybeSendStopSending(error);
@@ -733,8 +797,9 @@
     return;
   }
 
-  if (fin_buffered_) {
-    QUIC_BUG(quic_bug_10586_3) << "Fin already buffered";
+  if (fin_buffered_ || rst_stream_at_sent_) {
+    QUIC_BUG(quic_bug_10586_3)
+        << "Fin already buffered, or RESET_STREAM_AT sent";
     return;
   }
   if (write_side_closed_) {
@@ -791,7 +856,8 @@
   if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
     WriteBufferedData(session()->GetEncryptionLevelToSendApplicationData());
   }
-  if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
+  if (!fin_buffered_ && !fin_sent_ && !rst_stream_at_sent_ &&
+      CanWriteNewData()) {
     // Notify upper layer to write new data when buffered data size is below
     // low water mark.
     OnCanWriteNewData();
@@ -834,8 +900,9 @@
     return consumed_data;
   }
 
-  if (fin_buffered_) {
-    QUIC_BUG(quic_bug_10586_7) << "Fin already buffered";
+  if (fin_buffered_ || rst_stream_at_sent_) {
+    QUIC_BUG(quic_bug_10586_7)
+        << "Fin already buffered or RESET_STREAM_AT sent";
     return consumed_data;
   }
 
@@ -940,6 +1007,8 @@
 }
 
 void QuicStream::MaybeSendRstStream(QuicResetStreamError error) {
+  // It is OK to send RESET_STREAM after RESET_STREAM_AT. reliable_size can
+  // always decrease in the spec, so it doesn't check rst_stream_at_sent_.
   if (rst_sent_) {
     return;
   }
@@ -954,9 +1023,31 @@
   CloseWriteSide();
 }
 
+void QuicStream::MaybeSendResetStreamAt(QuicResetStreamError error) {
+  if (!session_->connection()->reliable_stream_reset_enabled() ||
+      !VersionHasIetfQuicFrames(transport_version())) {
+    QUIC_BUG_IF(quic_bug_gquic_calling_reset_stream_at,
+                !VersionHasIetfQuicFrames(transport_version()))
+        << "gQUIC is calling MaybeSendResetStreamAt";
+    MaybeSendRstStream(error);
+    return;
+  }
+  if (rst_sent_ || rst_stream_at_sent_) {
+    return;
+  }
+  // If data has been buffered but not sent, it doesn't normally count towards
+  // final_size. However, if that buffered data is within reliable_size_, it
+  // will have to be sent, and therefore needs to be included in final_size.
+  QuicByteCount final_size = std::max(stream_bytes_written(), reliable_size_);
+  session()->MaybeSendResetStreamAtFrame(id(), error, final_size,
+                                         reliable_size_);
+  rst_stream_at_sent_ = true;
+}
+
 bool QuicStream::HasBufferedData() const {
   QUICHE_DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
-  return send_buffer_.stream_offset() > stream_bytes_written();
+  return (send_buffer_.stream_offset() > stream_bytes_written() &&
+          (!rst_stream_at_sent_ || reliable_size_ > stream_bytes_written()));
 }
 
 ParsedQuicVersion QuicStream::version() const { return session_->version(); }
@@ -977,11 +1068,11 @@
 void QuicStream::OnClose() {
   QUICHE_DCHECK(read_side_closed_ && write_side_closed_);
 
-  if (!fin_sent_ && !rst_sent_) {
+  if (!fin_sent_ && !rst_sent_ && !rst_stream_at_sent_) {
     QUIC_BUG_IF(quic_bug_12570_6, session()->connection()->connected() &&
                                       session()->version().UsesHttp3())
-        << "The stream should've already sent RST in response to "
-           "STOP_SENDING";
+        << "The stream should've already sent RESET_STREAM or RESET_STREAM_AT "
+           "in response to STOP_SENDING";
     // For flow control accounting, tell the peer how many bytes have been
     // written on this stream before termination. Done here if needed, using a
     // RST_STREAM frame.
@@ -1169,6 +1260,9 @@
       !write_side_data_recvd_state_notified_) {
     OnWriteSideInDataRecvdState();
     write_side_data_recvd_state_notified_ = true;
+    if (rst_stream_at_sent_) {
+      session_->connection()->OnStreamReset(id_, stream_error_.internal_code());
+    }
   }
   if (notify_ack_listener_earlier_ && new_data_acked) {
     QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3);
@@ -1291,6 +1385,20 @@
 
   // Size of buffered data.
   QuicByteCount write_length = BufferedDataBytes();
+  // Do not send data beyond reliable_size_.
+  // TODO(martinduke): This code could be simpler if partial reset directly
+  // deleted data from the buffer, instead of notionally acking it. Since unsent
+  // data can't be acked, it's still in the buffer and has to be explicitly not
+  // sent.
+  if (rst_stream_at_sent_ &&
+      stream_bytes_written() + write_length > reliable_size_) {
+    if (reliable_size_ <= stream_bytes_written()) {
+      QUIC_BUG(quic_bug_reliable_size_already_sent)
+          << "Call to WriteBufferedData after reliable_size_ has been sent.";
+      return;
+    }
+    write_length = reliable_size_ - stream_bytes_written();
+  }
 
   // A FIN with zero data payload should not be flow control blocked.
   bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
@@ -1366,6 +1474,11 @@
   if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
     busy_counter_ = 0;
   }
+  if (rst_stream_at_sent_ && stream_bytes_written() >= reliable_size_) {
+    // If data up to reliable_size_ has been sent, the write side can finally
+    // close.
+    CloseWriteSide();
+  }
 }
 
 uint64_t QuicStream::BufferedDataBytes() const {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index f4a9f58..5604e66 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -181,9 +181,21 @@
   // interface.
   void Reset(QuicRstStreamErrorCode error);
 
-  // Reset() sends both RESET_STREAM and STOP_SENDING; the two methods below
-  // allow to send only one of those.
+  // Record the current offset as the reliable size to be delivered if a partial
+  // reset is called. Returns false if a RST_STREAM or RESET_STREAM_AT has
+  // already been sent, the stream is receive-only, or the connection does not
+  // support RESET_STREAM_AT.
+  bool SetReliableSize();
+
+  // Send a RESET_STREAM_AT with a reliable size that had earlier been set by
+  // SetReliableSize(). Does not send STOP_SENDING and does not close the read
+  // side. Will trigger QUIC_BUG if reliable_size_ is zero.
+  void PartialResetWriteSide(QuicResetStreamError error);
+  // TODO(rch): Delete this function once Envoy has migrated to
+  // PartialResetWriteSide.
   void ResetWriteSide(QuicResetStreamError error);
+  // Reset() sends both RESET_STREAM and STOP_SENDING; this allows the caller to
+  // send only STOP_SENDING.
   void SendStopSending(QuicResetStreamError error);
 
   // Called by the subclass or the sequencer to close the entire connection from
@@ -470,6 +482,8 @@
 
   // Send RESET_STREAM if it hasn't been sent yet.
   void MaybeSendRstStream(QuicResetStreamError error);
+  // Send RESET_STREAM_AT if neither it nor RESET_STREAM has been sent yet.
+  void MaybeSendResetStreamAt(QuicResetStreamError error);
 
   // Convenience wrappers for two methods above.
   void MaybeSendRstStream(QuicRstStreamErrorCode error) {
@@ -596,10 +610,11 @@
   // StreamFrame with the FIN set.
   bool fin_received_;
 
-  // True if an RST_STREAM has been sent to the session.
-  // In combination with fin_sent_, used to ensure that a FIN and/or a
-  // RST_STREAM is always sent to terminate the stream.
+  // True if an RST_STREAM or RESET_STREAM_AT has been sent to the session.
+  // In combination with fin_sent_, used to ensure that a FIN, RST_STREAM, or
+  // RESET_STREAM_AT is always sent to terminate the stream.
   bool rst_sent_;
+  bool rst_stream_at_sent_;
 
   // True if this stream has received a RST_STREAM frame.
   bool rst_received_;
@@ -660,6 +675,10 @@
 
   const bool notify_ack_listener_earlier_ =
       GetQuicReloadableFlag(quic_notify_ack_listener_earlier);
+
+  // If the stream is reset, outgoing data up to reliable_size_will be
+  // delivered (and acknowledged) before the write side of the stream is closed.
+  QuicStreamOffset reliable_size_;
 };
 
 }  // namespace quic
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc
index d738fd6..c7bd13d 100644
--- a/quiche/quic/core/quic_stream_test.cc
+++ b/quiche/quic/core/quic_stream_test.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/core/quic_stream.h"
 
+#include <cmath>
 #include <cstddef>
 #include <memory>
 #include <optional>
@@ -21,6 +22,7 @@
 #include "quiche/quic/core/quic_connection.h"
 #include "quiche/quic/core/quic_constants.h"
 #include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_stream_sequencer.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/quic_utils.h"
 #include "quiche/quic/core/quic_versions.h"
@@ -91,6 +93,8 @@
     ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1));
   }
 
+  QuicStreamSequencer* sequencer() { return QuicStream::sequencer(); }
+
  private:
   std::string data_;
 };
@@ -121,6 +125,7 @@
     QuicConfigPeer::SetReceivedInitialMaxStreamDataBytesOutgoingBidirectional(
         session_->config(), kMinimumFlowControlSendWindow);
     QuicConfigPeer::SetReceivedMaxUnidirectionalStreams(session_->config(), 10);
+    session_->config()->SetReliableStreamReset(true);
     session_->OnConfigNegotiated();
 
     stream_ = new StrictMock<TestStream>(kTestStreamId, session_.get(),
@@ -167,6 +172,23 @@
     return true;
   }
 
+  // Use application stream interface for sending data. This will trigger a call
+  // to mock_stream->Writev(_, _) that will have to return QuicConsumedData.
+  QuicConsumedData SendApplicationData(TestStream* stream,
+                                       absl::string_view data, size_t iov_len,
+                                       bool fin) {
+    struct iovec iov = {const_cast<char*>(data.data()), iov_len};
+    quiche::QuicheMemSliceStorage storage(
+        &iov, 1,
+        session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
+    return stream->WriteMemSlices(storage.ToSpan(), fin);
+  }
+
+  QuicConsumedData SendApplicationData(absl::string_view data, size_t iov_len,
+                                       bool fin) {
+    return SendApplicationData(stream_, data, iov_len, fin);
+  }
+
  protected:
   MockQuicConnectionHelper helper_;
   MockAlarmFactory alarm_factory_;
@@ -1222,11 +1244,7 @@
   // Testing Writev.
   EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
       .WillOnce(Return(QuicConsumedData(0, false)));
-  struct iovec iov = {const_cast<char*>(data.data()), data.length()};
-  quiche::QuicheMemSliceStorage storage(
-      &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
-      1024);
-  QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
+  QuicConsumedData consumed = SendApplicationData(data, data.length(), false);
 
   // There is no buffered data before, all data should be consumed without
   // respecting buffered data upper limit.
@@ -1236,10 +1254,8 @@
   EXPECT_FALSE(stream_->CanWriteNewData());
 
   EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
-  quiche::QuicheMemSliceStorage storage2(
-      &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
-      1024);
-  consumed = stream_->WriteMemSlices(storage2.ToSpan(), false);
+  consumed = SendApplicationData(data, data.length(), false);
+
   // No Data can be consumed as buffered data is beyond upper limit.
   EXPECT_EQ(0u, consumed.bytes_consumed);
   EXPECT_FALSE(consumed.fin_consumed);
@@ -1261,10 +1277,7 @@
 
   EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
   // All data can be consumed as buffered data is below upper limit.
-  quiche::QuicheMemSliceStorage storage3(
-      &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
-      1024);
-  consumed = stream_->WriteMemSlices(storage3.ToSpan(), false);
+  consumed = SendApplicationData(data, data.length(), false);
   EXPECT_EQ(data.length(), consumed.bytes_consumed);
   EXPECT_FALSE(consumed.fin_consumed);
   EXPECT_EQ(data.length() + GetQuicFlag(quic_buffered_data_threshold) - 1,
@@ -1279,21 +1292,13 @@
                                         stream_);
   EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
       .WillOnce(Invoke(session_.get(), &MockQuicSession::ConsumeData));
-  struct iovec iov = {const_cast<char*>(data.data()), 5u};
-  quiche::QuicheMemSliceStorage storage(
-      &iov, 1, session_->connection()->helper()->GetStreamSendBufferAllocator(),
-      1024);
-  QuicConsumedData consumed = stream_->WriteMemSlices(storage.ToSpan(), false);
+  QuicConsumedData consumed = SendApplicationData(data, 5, false);
   EXPECT_EQ(data.length(), consumed.bytes_consumed);
-  struct iovec iov2 = {const_cast<char*>(data.data()), 1u};
-  quiche::QuicheMemSliceStorage storage2(
-      &iov2, 1,
-      session_->connection()->helper()->GetStreamSendBufferAllocator(), 1024);
   EXPECT_QUIC_BUG(
       {
         EXPECT_CALL(*connection_,
                     CloseConnection(QUIC_STREAM_LENGTH_OVERFLOW, _, _));
-        stream_->WriteMemSlices(storage2.ToSpan(), false);
+        SendApplicationData(data, 1, false);
       },
       "Write too many data via stream");
 }
@@ -1942,6 +1947,353 @@
       QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100)));
 }
 
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfReset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  QuicByteCount newly_acked_length = 0;
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  EXPECT_TRUE(closed_streams->empty());
+  // Peer sends RST_STREAM in response.
+  QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+                               QUIC_STREAM_CANCELLED, 1234);
+  stream_->OnStreamReset(rst_frame);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+  ASSERT_EQ(closed_streams->size(), 1);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetAndRetransmitted) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  // Send 50 more bytes that aren't reliable.
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(50, false)));
+  SendApplicationData(data, 50, false);
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+
+  // Lose all the bytes.
+  stream_->OnStreamFrameLost(0, 150, false);
+  // Cause retransmission of the reliable bytes.
+  EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  stream_->OnCanWrite();
+
+  // Ack the reliable bytes, and close.
+  QuicByteCount newly_acked_length = 0;
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  EXPECT_TRUE(closed_streams->empty());
+  // Peer sends RST_STREAM in response.
+  QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+                               QUIC_STREAM_CANCELLED, 1234);
+  stream_->OnStreamReset(rst_frame);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+  ASSERT_EQ(closed_streams->size(), 1);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideReset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+
+  // Peer sends RST_STREAM in response.
+  QuicRstStreamFrame rst_frame(kInvalidControlFrameId, stream_->id(),
+                               QUIC_STREAM_CANCELLED, 1234);
+  stream_->OnStreamReset(rst_frame);
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  ASSERT_TRUE(closed_streams->empty());
+  QuicByteCount newly_acked_length = 0;
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  ASSERT_EQ(closed_streams->size(), 1);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+}
+
+TEST_P(QuicStreamTest, ReliableSizeNotAckedAtTimeOfResetThenReadSideFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  EXPECT_TRUE(stream_->write_side_closed());
+
+  // Peer sends OOO FIN.
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), true, sizeof(data), ""));
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  ASSERT_TRUE(closed_streams->empty());
+  EXPECT_FALSE(stream_->read_side_closed());  // Missing the data before 100.
+
+  QuicByteCount newly_acked_length = 0;
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  ASSERT_TRUE(closed_streams->empty());
+  // The rest of the stream arrives.
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([&]() {
+    // Most classes derived from QuicStream do something like this in
+    // OnDataAvailable. This is how FIN-related state is updated.
+    std::string buffer;
+    stream_->sequencer()->Read(&buffer);
+    if (stream_->sequencer()->IsClosed()) {
+      stream_->OnFinRead();
+    }
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(
+      stream_->id(), false, 0, absl::string_view(data, sizeof(data))));
+  EXPECT_TRUE(stream_->read_side_closed());
+  ASSERT_EQ(closed_streams->size(), 1);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), stream_->id());
+}
+
+TEST_P(QuicStreamTest, ReliableSizeAckedAtTimeOfReset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  char data[100];
+  SendApplicationData(data, 100, false);
+  QuicByteCount newly_acked_length = 0;
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+}
+
+TEST_P(QuicStreamTest, BufferedDataInReliableSize) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  EXPECT_CALL(*session_, WritevData(stream_->id(), 100, 0, _, _, _))
+      .WillOnce(Return(QuicConsumedData(50, false)));
+  char data[100];
+  // 50 bytes of this will be buffered.
+  SendApplicationData(data, 100, false);
+  EXPECT_EQ(stream_->BufferedDataBytes(), 50);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  EXPECT_FALSE(stream_->write_side_closed());
+  EXPECT_CALL(*session_, WritevData(stream_->id(), 50, 50, _, _, _))
+      .WillOnce(Return(QuicConsumedData(50, false)));
+  stream_->OnCanWrite();
+  // Now that the stream has sent 100 bytes, the write side can be closed.
+  EXPECT_TRUE(stream_->write_side_closed());
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState()).Times(1);
+  EXPECT_CALL(*connection_, OnStreamReset(stream_->id(), _)).Times(1);
+  QuicByteCount newly_acked_length = 0;
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+}
+
+TEST_P(QuicStreamTest, ReliableSizeIsFinOffset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  EXPECT_CALL(*session_, WritevData(_, 100, 0, FIN, _, _))
+      .WillOnce(Return(QuicConsumedData(100, true)));
+  char data[100];
+  SendApplicationData(data, 100, true);
+  // Send STOP_SENDING, but nothing else.
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  EXPECT_CALL(*session_, MaybeSendRstStreamFrame(_, _, _)).Times(0);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  // Lose the packet; the stream will not be FINed again.
+  stream_->OnStreamFrameLost(0, 100, true);
+  EXPECT_CALL(*session_,
+              WritevData(stream_->id(), 100, 0, NO_FIN, LOSS_RETRANSMISSION, _))
+      .WillOnce(Return(QuicConsumedData(100, true)));
+  stream_->OnCanWrite();
+}
+
+TEST_P(QuicStreamTest, DataAfterResetStreamAt) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(0);
+  EXPECT_QUIC_BUG(SendApplicationData(data, 100, false),
+                  "Fin already buffered or RESET_STREAM_AT sent");
+  EXPECT_EQ(stream_->stream_bytes_written(), 100);
+}
+
+TEST_P(QuicStreamTest, SetReliableSizeOnUnidirectionalRead) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicStreamId stream_id = QuicUtils::GetFirstUnidirectionalStreamId(
+      connection_->transport_version(), Perspective::IS_CLIENT);
+  TestStream stream(stream_id, session_.get(), READ_UNIDIRECTIONAL);
+  EXPECT_FALSE(stream.SetReliableSize());
+}
+
+// This covers the case where the read side is already closed, that the zombie
+// stream is cleaned up.
+TEST_P(QuicStreamTest, ResetStreamAtUnidirectionalWrite) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  const QuicStreamId kId = 3;
+  std::unique_ptr<TestStream> stream =
+      std::make_unique<TestStream>(kId, session_.get(), WRITE_UNIDIRECTIONAL);
+  TestStream* stream_ptr = stream.get();
+  session_->ActivateStream(std::move(stream));
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(kId, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(stream_ptr, data, 100, false);
+  EXPECT_TRUE(stream_ptr->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_ptr->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  EXPECT_CALL(*stream_ptr, OnWriteSideInDataRecvdState());
+  EXPECT_CALL(*connection_, OnStreamReset(kId, _)).Times(1);
+  ;
+  QuicByteCount newly_acked_length = 0;
+  stream_ptr->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                                 QuicTime::Zero(), &newly_acked_length);
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  ASSERT_EQ(closed_streams->size(), 1);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), kId);
+}
+
+// This covers the case where the read side is already closed with FIN, that the
+// zombie stream is cleaned up.
+TEST_P(QuicStreamTest, ResetStreamAtReadSideFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  // Fin the read side.
+  QuicStreamId stream_id = stream_->id();
+  EXPECT_CALL(*stream_, OnDataAvailable()).Times(1);
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 0, ""));
+  stream_->OnFinRead();
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  SendApplicationData(data, 100, false);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->PartialResetWriteSide(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+  EXPECT_CALL(*stream_, OnWriteSideInDataRecvdState());
+  EXPECT_CALL(*connection_, OnStreamReset(stream_id, _)).Times(1);
+  QuicByteCount newly_acked_length = 0;
+  stream_->OnStreamFrameAcked(0, 100, false, QuicTime::Delta::Zero(),
+                              QuicTime::Zero(), &newly_acked_length);
+  std::vector<std::unique_ptr<QuicStream>>* closed_streams =
+      session_->ClosedStreams();
+  ASSERT_EQ(closed_streams->size(), 1);
+  EXPECT_EQ((*(closed_streams->begin()))->id(), stream_id);
+}
+
+TEST_P(QuicStreamTest, ResetStreamAtAfterStopSending) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr);
+  EXPECT_TRUE(stream_->SetReliableSize());
+  EXPECT_CALL(*session_, MaybeSendResetStreamAtFrame(_, _, _, _)).Times(1);
+  stream_->OnStopSending(
+      QuicResetStreamError::FromInternal(QUIC_STREAM_CANCELLED));
+}
+
+TEST_P(QuicStreamTest, RejectReliableSizeOldVersion) {
+  Initialize();
+  if (VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _))
+      .WillOnce(Return(QuicConsumedData(100, false)));
+  stream_->WriteOrBufferData(absl::string_view(data, 100), false, nullptr);
+  EXPECT_FALSE(stream_->SetReliableSize());
+}
+
+TEST_P(QuicStreamTest, RejectReliableSizeReadOnlyStream) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  auto uni = new StrictMock<TestStream>(6, session_.get(), READ_UNIDIRECTIONAL);
+  session_->ActivateStream(absl::WrapUnique(uni));
+  EXPECT_FALSE(uni->SetReliableSize());
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quiche/quic/test_tools/quic_test_utils.h b/quiche/quic/test_tools/quic_test_utils.h
index 90d9be4..10f9fb1 100644
--- a/quiche/quic/test_tools/quic_test_utils.h
+++ b/quiche/quic/test_tools/quic_test_utils.h
@@ -36,6 +36,7 @@
 #include "quiche/quic/core/quic_path_validator.h"
 #include "quiche/quic/core/quic_sent_packet_manager.h"
 #include "quiche/quic/core/quic_server_id.h"
+#include "quiche/quic/core/quic_session.h"
 #include "quiche/quic/core/quic_time.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/quic_utils.h"
@@ -826,6 +827,10 @@
               (QuicStreamId stream_id, QuicResetStreamError error,
                QuicStreamOffset bytes_written),
               (override));
+  MOCK_METHOD(void, MaybeSendResetStreamAtFrame,
+              (QuicStreamId stream_id, QuicResetStreamError error,
+               QuicStreamOffset bytes_written, QuicStreamOffset reliable_size),
+              (override));
   MOCK_METHOD(void, MaybeSendStopSendingFrame,
               (QuicStreamId stream_id, QuicResetStreamError error), (override));
   MOCK_METHOD(void, SendBlocked,
@@ -853,6 +858,8 @@
         id, QuicResetStreamError::FromInternal(error), bytes_written);
   }
 
+  ClosedStreams* ClosedStreams() { return QuicSession::closed_streams(); }
+
  private:
   std::unique_ptr<QuicCryptoStream> crypto_stream_;
 };
diff --git a/quiche/quic/test_tools/simple_quic_framer.cc b/quiche/quic/test_tools/simple_quic_framer.cc
index 13b21f6..8ad6656 100644
--- a/quiche/quic/test_tools/simple_quic_framer.cc
+++ b/quiche/quic/test_tools/simple_quic_framer.cc
@@ -358,17 +358,23 @@
 
 SimpleQuicFramer::SimpleQuicFramer()
     : framer_(AllSupportedVersions(), QuicTime::Zero(), Perspective::IS_SERVER,
-              kQuicDefaultConnectionIdLength) {}
+              kQuicDefaultConnectionIdLength) {
+  framer_.set_process_reset_stream_at(true);
+}
 
 SimpleQuicFramer::SimpleQuicFramer(
     const ParsedQuicVersionVector& supported_versions)
     : framer_(supported_versions, QuicTime::Zero(), Perspective::IS_SERVER,
-              kQuicDefaultConnectionIdLength) {}
+              kQuicDefaultConnectionIdLength) {
+  framer_.set_process_reset_stream_at(true);
+}
 
 SimpleQuicFramer::SimpleQuicFramer(
     const ParsedQuicVersionVector& supported_versions, Perspective perspective)
     : framer_(supported_versions, QuicTime::Zero(), perspective,
-              kQuicDefaultConnectionIdLength) {}
+              kQuicDefaultConnectionIdLength) {
+  framer_.set_process_reset_stream_at(true);
+}
 
 SimpleQuicFramer::~SimpleQuicFramer() {}