Update QuicStream to handle RESET_STREAM_AT frames.

When received, the stream will make a note of the reliable_size and store a copy of the RESET_STREAM_AT.

If the stream has already consumed at least reliable_size bytes, processes the copy as if it were a regular RST_STREAM.

If not already consumed, QuicStream checks every time data is consumed to see if the reliable size has been exceeded, and when it is, processes the reset unless the FIN bit has already been consumed.

This CL is just the QuicStream. Other components will follow in later CLs.

Protected by FLAGS_quic_reloadable_flag_quic_reliable_stream_reset.

PiperOrigin-RevId: 675988866
diff --git a/quiche/quic/core/quic_stream_test.cc b/quiche/quic/core/quic_stream_test.cc
index 6e345cf..d738fd6 100644
--- a/quiche/quic/core/quic_stream_test.cc
+++ b/quiche/quic/core/quic_stream_test.cc
@@ -4,6 +4,7 @@
 
 #include "quiche/quic/core/quic_stream.h"
 
+#include <cstddef>
 #include <memory>
 #include <optional>
 #include <string>
@@ -15,6 +16,7 @@
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/crypto/null_encrypter.h"
 #include "quiche/quic/core/frames/quic_connection_close_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_connection.h"
 #include "quiche/quic/core/quic_constants.h"
@@ -80,6 +82,15 @@
   using QuicStream::WriteMemSlices;
   using QuicStream::WriteOrBufferData;
 
+  void ConsumeData(size_t num_bytes) {
+    char buffer[1024];
+    ASSERT_GT(ABSL_ARRAYSIZE(buffer), num_bytes);
+    struct iovec iov;
+    iov.iov_base = buffer;
+    iov.iov_len = num_bytes;
+    ASSERT_EQ(num_bytes, QuicStreamPeer::sequencer(this)->Readv(&iov, 1));
+  }
+
  private:
   std::string data_;
 };
@@ -344,6 +355,34 @@
             session_->flow_controller()->highest_received_byte_offset());
 }
 
+TEST_P(PendingStreamTest, ResetStreamAt) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+
+  PendingStream pending(kTestPendingStreamId, session_.get());
+
+  QuicResetStreamAtFrame rst(0, kTestPendingStreamId, QUIC_STREAM_CANCELLED,
+                             100, 3);
+  pending.OnResetStreamAtFrame(rst);
+  QuicStreamFrame frame(kTestPendingStreamId, false, 2, ".");
+  pending.OnStreamFrame(frame);
+
+  auto stream = new TestStream(&pending, session_.get(), false);
+  session_->ActivateStream(absl::WrapUnique(stream));
+
+  EXPECT_FALSE(stream->rst_received());
+  EXPECT_FALSE(stream->read_side_closed());
+  EXPECT_CALL(*stream, OnDataAvailable()).WillOnce([&]() {
+    stream->ConsumeData(3);
+  });
+  QuicStreamFrame frame2(kTestPendingStreamId, false, 0, "..");
+  stream->OnStreamFrame(frame2);
+  EXPECT_TRUE(stream->read_side_closed());
+  EXPECT_TRUE(stream->rst_received());
+}
+
 TEST_P(QuicStreamTest, WriteAllData) {
   Initialize();
 
@@ -1751,6 +1790,158 @@
   EXPECT_TRUE(rst_sent());
 }
 
+TEST_P(QuicStreamTest, ResetWhenOffsetReached) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(99);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(1);
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ResetWhenOffsetReachedOutOfOrder) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(100);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, HigherReliableSizeIgnored) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  QuicResetStreamAtFrame rst2(0, stream_->id(), QUIC_STREAM_CANCELLED, 400,
+                              200);
+  stream_->OnResetStreamAtFrame(rst2);  // Ignored.
+
+  // Send data to reach reliable_offset.
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(99);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 99)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(1);
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 99,
+                                         absl::string_view(data + 99, 1)));
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, InstantReset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(100);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 100)));
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 400, 100);
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  stream_->OnResetStreamAtFrame(rst);
+  EXPECT_TRUE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ResetIgnoredDueToFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(98);
+  });
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), false, 0, absl::string_view(data, 98)));
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 100, 99);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  // There is no call to OnFinRead() because the stream is responsible for
+  // doing that.
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_FALSE(stream_->read_side_closed());
+  EXPECT_CALL(*stream_, OnDataAvailable()).WillOnce([this]() {
+    stream_->ConsumeData(2);
+    stream_->OnFinRead();
+  });
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
+                                         absl::string_view(data + 98, 2)));
+  EXPECT_FALSE(stream_->rst_received());
+  EXPECT_TRUE(stream_->read_side_closed());
+}
+
+TEST_P(QuicStreamTest, ReliableOffsetBeyondFin) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  char data[100];
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, 98,
+                                         absl::string_view(data + 98, 2)));
+  EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
+      .Times(1);
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+}
+
+TEST_P(QuicStreamTest, FinBeforeReliableOffset) {
+  Initialize();
+  if (!VersionHasIetfQuicFrames(session_->transport_version())) {
+    return;
+  }
+  QuicResetStreamAtFrame rst(0, stream_->id(), QUIC_STREAM_CANCELLED, 101, 101);
+  stream_->OnResetStreamAtFrame(rst);  // Nothing happens.
+  char data[100];
+  EXPECT_CALL(*connection_, CloseConnection(QUIC_STREAM_MULTIPLE_OFFSET, _, _))
+      .Times(1);
+  stream_->OnStreamFrame(
+      QuicStreamFrame(stream_->id(), true, 0, absl::string_view(data, 100)));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic