Update QuicStream to handle RESET_STREAM_AT frames.

When received, the stream will make a note of the reliable_size and store a copy of the RESET_STREAM_AT.

If the stream has already consumed at least reliable_size bytes, processes the copy as if it were a regular RST_STREAM.

If not already consumed, QuicStream checks every time data is consumed to see if the reliable size has been exceeded, and when it is, processes the reset unless the FIN bit has already been consumed.

This CL is just the QuicStream. Other components will follow in later CLs.

Protected by FLAGS_quic_reloadable_flag_quic_reliable_stream_reset.

PiperOrigin-RevId: 675988866
diff --git a/quiche/quic/core/frames/quic_reset_stream_at_frame.h b/quiche/quic/core/frames/quic_reset_stream_at_frame.h
index 95e7ee9..a7ec5b3 100644
--- a/quiche/quic/core/frames/quic_reset_stream_at_frame.h
+++ b/quiche/quic/core/frames/quic_reset_stream_at_frame.h
@@ -8,7 +8,9 @@
 #include <cstdint>
 #include <ostream>
 
+#include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_constants.h"
+#include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/common/platform/api/quiche_export.h"
 
@@ -42,6 +44,14 @@
   // The RESET_STREAM is active only after the application reads up to
   // `reliable_offset` bytes.
   QuicStreamOffset reliable_offset = 0;
+
+  // Reverts this to a RST_STREAM frame. If the reliable_offset is not zero,
+  // that information is lost.
+  QuicRstStreamFrame ToRstStream() const {
+    return QuicRstStreamFrame(control_frame_id, stream_id,
+                              static_cast<QuicRstStreamErrorCode>(error),
+                              final_offset);
+  }
 };
 
 }  // namespace quic
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index e927ee2..9430a36 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -12,6 +12,7 @@
 
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_flow_controller.h"
 #include "quiche/quic/core/quic_session.h"
@@ -252,6 +253,26 @@
   }
 }
 
+void PendingStream::OnResetStreamAtFrame(const QuicResetStreamAtFrame& frame) {
+  if (frame.reliable_offset > sequencer()->close_offset()) {
+    OnUnrecoverableError(
+        QUIC_STREAM_MULTIPLE_OFFSET,
+        absl::StrCat(
+            "Stream ", id_,
+            " received reliable reset with offset: ", frame.reliable_offset,
+            " greater than the FIN offset: ", sequencer()->close_offset()));
+    return;
+  }
+  if (buffered_reset_stream_at_.has_value() &&
+      (frame.reliable_offset > buffered_reset_stream_at_->reliable_offset)) {
+    // Ignore a reliable reset that raises the reliable size. It might have
+    // arrived out of sequence.
+    return;
+  }
+  buffered_reset_stream_at_ = frame;
+  sequencer_.OnReliableReset(frame.reliable_offset);
+}
+
 void PendingStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
   QUICHE_DCHECK(is_bidirectional_);
   flow_controller_.UpdateSendWindowOffset(frame.max_data);
@@ -302,6 +323,7 @@
           (session->GetClock()->ApproximateNow() - pending->creation_time())) {
   QUICHE_DCHECK(session->version().HasIetfQuicFrames());
   sequencer_.set_stream(this);
+  buffered_reset_stream_at_ = pending->buffered_reset_stream_at();
 }
 
 namespace {
@@ -563,6 +585,29 @@
   CloseReadSide();
 }
 
+void QuicStream::OnResetStreamAtFrame(const QuicResetStreamAtFrame& frame) {
+  if (frame.reliable_offset > sequencer()->close_offset()) {
+    OnUnrecoverableError(
+        QUIC_STREAM_MULTIPLE_OFFSET,
+        absl::StrCat(
+            "Stream ", id_,
+            " received reliable reset with offset: ", frame.reliable_offset,
+            " greater than the FIN offset: ", sequencer()->close_offset()));
+    return;
+  }
+  if (buffered_reset_stream_at_.has_value() &&
+      (frame.reliable_offset > buffered_reset_stream_at_->reliable_offset)) {
+    // Ignore a reliable reset that raises the reliable size. It might have
+    // arrived out of sequence.
+    return;
+  }
+  buffered_reset_stream_at_ = frame;
+  MaybeCloseStreamWithBufferedReset();
+  if (!rst_received_) {
+    sequencer_.OnReliableReset(frame.reliable_offset);
+  }
+}
+
 void QuicStream::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
                                     ConnectionCloseSource /*source*/) {
   if (read_side_closed_ && write_side_closed_) {
@@ -1023,6 +1068,7 @@
   if (stream_contributes_to_connection_flow_control_) {
     connection_flow_controller_->AddBytesConsumed(bytes);
   }
+  MaybeCloseStreamWithBufferedReset();
 }
 
 bool QuicStream::MaybeConfigSendWindowOffset(QuicStreamOffset new_offset,
@@ -1387,6 +1433,14 @@
   return true;
 }
 
+void QuicStream::MaybeCloseStreamWithBufferedReset() {
+  if (buffered_reset_stream_at_.has_value() && !sequencer_.IsClosed() &&
+      NumBytesConsumed() >= buffered_reset_stream_at_->reliable_offset) {
+    OnStreamReset(buffered_reset_stream_at_->ToRstStream());
+    buffered_reset_stream_at_ = std::nullopt;
+  }
+}
+
 void QuicStream::OnDeadlinePassed() { Reset(QUIC_STREAM_TTL_EXPIRED); }
 
 bool QuicStream::IsFlowControlBlocked() const {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index 43cacc0..4185686 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -26,6 +26,7 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/span.h"
 #include "quiche/quic/core/frames/quic_connection_close_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_flow_controller.h"
@@ -82,6 +83,8 @@
   // If the final offset violates flow control, the connection will be closed.
   void OnRstStreamFrame(const QuicRstStreamFrame& frame);
 
+  void OnResetStreamAtFrame(const QuicResetStreamAtFrame& frame);
+
   void OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame);
 
   void OnStopSending(QuicResetStreamError stop_sending_error_code);
@@ -104,6 +107,10 @@
 
   QuicTime creation_time() const { return creation_time_; }
 
+  std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at() const {
+    return buffered_reset_stream_at_;
+  }
+
  private:
   friend class QuicStream;
 
@@ -138,6 +145,9 @@
   std::optional<QuicResetStreamError> stop_sending_error_code_;
   // The time when this pending stream is created.
   const QuicTime creation_time_;
+
+  // When RESET_STREAM_AT arrives,buffer it for when reliable_size is consumed.
+  std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at_;
 };
 
 class QUICHE_EXPORT QuicStream : public QuicStreamSequencer::StreamInterface {
@@ -195,6 +205,9 @@
   // Called by the session when the endpoint receives a RST_STREAM from the
   // peer.
   virtual void OnStreamReset(const QuicRstStreamFrame& frame);
+  // Called by the session when the endpoint receives a RESET_STREAM_AT from the
+  // peer.
+  virtual void OnResetStreamAtFrame(const QuicResetStreamAtFrame& frame);
 
   // Called by the session when the endpoint receives or sends a connection
   // close, and should immediately close the stream.
@@ -523,6 +536,10 @@
   // Returns true if deadline_ has passed.
   bool HasDeadlinePassed() const;
 
+  // If we've received a RST_STREAM_AT and have processed all remaining data,
+  // then process buffered_reset_stream_at_.
+  void MaybeCloseStreamWithBufferedReset();
+
   QuicStreamSequencer sequencer_;
   QuicStreamId id_;
   // Pointer to the owning QuicSession object.
@@ -622,6 +639,9 @@
   // before being moved to this QuicStream.
   const QuicTime::Delta pending_duration_;
 
+  // When RESET_STREAM_AT arrives,buffer it for when reliable_size is consumed.
+  std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at_;
+
   Perspective perspective_;
 };
 
diff --git a/quiche/quic/core/quic_stream_sequencer.cc b/quiche/quic/core/quic_stream_sequencer.cc
index 540c8c7..c790110 100644
--- a/quiche/quic/core/quic_stream_sequencer.cc
+++ b/quiche/quic/core/quic_stream_sequencer.cc
@@ -32,6 +32,7 @@
       buffered_frames_(kStreamReceiveWindowLimit),
       highest_offset_(0),
       close_offset_(std::numeric_limits<QuicStreamOffset>::max()),
+      reliable_offset_(0),
       blocked_(false),
       num_frames_received_(0),
       num_duplicate_frames_received_(0),
@@ -73,6 +74,10 @@
   OnFrameData(frame.offset, frame.data_length, frame.data_buffer);
 }
 
+void QuicStreamSequencer::OnReliableReset(QuicStreamOffset reliable_size) {
+  reliable_offset_ = reliable_size;
+}
+
 void QuicStreamSequencer::OnFrameData(QuicStreamOffset byte_offset,
                                       size_t data_len,
                                       const char* data_buffer) {
@@ -151,6 +156,15 @@
     return false;
   }
 
+  if (offset < reliable_offset_) {
+    stream_->OnUnrecoverableError(
+        QUIC_STREAM_MULTIPLE_OFFSET,
+        absl::StrCat(
+            "Stream ", stream_->id(), " received fin with offset: ", offset,
+            ", which reduces current reliable offset: ", reliable_offset_));
+    return false;
+  }
+
   close_offset_ = offset;
 
   MaybeCloseStream();
diff --git a/quiche/quic/core/quic_stream_sequencer.h b/quiche/quic/core/quic_stream_sequencer.h
index 2d5bfcf..3451b43 100644
--- a/quiche/quic/core/quic_stream_sequencer.h
+++ b/quiche/quic/core/quic_stream_sequencer.h
@@ -75,6 +75,10 @@
   // will be buffered.
   void OnCryptoFrame(const QuicCryptoFrame& frame);
 
+  // Notify the sequencer of a RESET_STREAM_AT so it can verify that any FIN is
+  // consistent.
+  void OnReliableReset(QuicStreamOffset reliable_size);
+
   // Once data is buffered, it's up to the stream to read it when the stream
   // can handle more data.  The following three functions make that possible.
 
@@ -197,6 +201,9 @@
   // have been processed, the sequencer will be closed.
   QuicStreamOffset close_offset_;
 
+  // The offset at which the stream can be reset.
+  QuicStreamOffset reliable_offset_;
+
   // If true, the sequencer is blocked from passing data to the stream and will
   // buffer all new incoming data until FlushBufferedFrames is called.
   bool blocked_;
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc
index 6e345cf..d738fd6 100644
--- a/quiche/quic/core/quic_stream_test.cc
+++ b/quiche/quic/core/quic_stream_test.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/core/quic_stream.h"
 
+#include <cstddef>
 #include <memory>
 #include <optional>
 #include <string>
@@ -15,6 +16,7 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/crypto/null_encrypter.h"
 #include "quiche/quic/core/frames/quic_connection_close_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_connection.h"
 #include "quiche/quic/core/quic_constants.h"
@@ -80,6 +82,15 @@
   using QuicStream::WriteMemSlices;
   using QuicStream::WriteOrBufferData;
 
+  void ConsumeData(size_t num_bytes) {
+    char buffer[1024];
+    ASSERT_GT(ABSL_ARRAYSIZE(buffer), num_bytes);
+    struct iovec iov;
+    iov.iov_base = buffer;
+    iov.iov_len = num_bytes;
+    ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1));
+  }
+
  private:
   std::string data_;
 };
@@ -344,6 +355,34 @@
             session_->flow_controller()->highest_received_byte_offset());
 }
 
+TEST_P(PendingStreamTest, ResetStreamAt) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+
+  PendingStream pending(kTestPendingStreamId, session_.get());
+
+  QuicResetStreamAtFrame rst(0, kTestPendingStreamId, QUIC_STREAM_CANCELLED,
+                             100, 3);
+  pending.OnResetStreamAtFrame(rst);
+  QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
+  pending.OnStreamFrame(frame);
+
+  auto stream = new TestStream(&pending, session_.get(), false);
+  session_->ActivateStream(absl::WrapUnique(stream));
+
+  EXPECT_FALSE(stream->rst_received());
+  EXPECT_FALSE(stream->read_side_closed());
+  EXPECT_CALL(*stream, OnDataAvailable()).WillOnce([&]() {
+    stream->ConsumeData(3);
+  });
+  QuicStreamFrame frame2(kTestPendingStreamId, false, 0, "..");
+  stream->OnStreamFrame(frame2);
+  EXPECT_TRUE(stream->read_side_closed());
+  EXPECT_TRUE(stream->rst_received());
+}
+
 TEST_P(QuicStreamTest, WriteAllData) {
   Initialize();
 
@@ -1751,6 +1790,158 @@
   EXPECT_TRUE(rst_sent());
 }
 
+TEST_P(QuicStreamTest, ResetWhenOffsetReached) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(99);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(1);
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ResetWhenOffsetReachedOutOfOrder) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(100);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, HigherReliableSizeIgnored) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  QuicResetStreamAtFrame rst2(0, stream_->id(), QUIC_STREAM_CANCELLED, 400,
+                              200);
+  stream_->OnResetStreamAtFrame(rst2);  // Ignored.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(99);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(1);
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, InstantReset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(100);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 100)));
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  stream_->OnResetStreamAtFrame(rst);
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ResetIgnoredDueToFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(98);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 98)));
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 100, 99);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  // There is no call to OnFinRead() because the stream is responsible for
+  // doing that.
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(2);
+    stream_->OnFinRead();
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
+                                         absl::string_view(data + 98, 2)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ReliableOffsetBeyondFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
+                                         absl::string_view(data + 98, 2)));
+  EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
+      .Times(1);
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+}
+
+TEST_P(QuicStreamTest, FinBeforeReliableOffset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  char data[100];
+  EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
+      .Times(1);
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100)));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic