Complete processing of RESET_STREAM_AT frames. Negotiate with transport parameters and pass the frame from QuicConnection to QuicStream.

This could be an MVP; it is compliant to negotiate, process, and never send. But a sending CL will follow.

PiperOrigin-RevId: 679284591
diff --git a/quiche/quic/core/crypto/transport_parameters.cc b/quiche/quic/core/crypto/transport_parameters.cc
index 1aaa820..0456134 100644
--- a/quiche/quic/core/crypto/transport_parameters.cc
+++ b/quiche/quic/core/crypto/transport_parameters.cc
@@ -71,6 +71,8 @@
 
   kMinAckDelay = 0xDE1A,           // draft-iyengar-quic-delayed-ack.
   kVersionInformation = 0xFF73DB,  // draft-ietf-quic-version-negotiation.
+  kReliableStreamReset = 0x17F7586D2CB571,
+  // draft-ietf-quic-reliable-stream-reset.
 };
 
 namespace {
@@ -140,6 +142,8 @@
       return "min_ack_delay_us";
     case TransportParameters::kVersionInformation:
       return "version_information";
+    case TransportParameters::kReliableStreamReset:
+      return "reliable_stream_reset";
   }
   return absl::StrCat("Unknown(", param_id, ")");
 }
@@ -171,6 +175,7 @@
     case TransportParameters::kGoogleQuicVersion:
     case TransportParameters::kMinAckDelay:
     case TransportParameters::kVersionInformation:
+    case TransportParameters::kReliableStreamReset:
       return true;
   }
   return false;
@@ -417,6 +422,9 @@
   if (disable_active_migration) {
     rv += " " + TransportParameterIdToString(kDisableActiveMigration);
   }
+  if (reliable_stream_reset) {
+    rv += " " + TransportParameterIdToString(kReliableStreamReset);
+  }
   if (preferred_address) {
     rv += " " + TransportParameterIdToString(kPreferredAddress) + " " +
           preferred_address->ToString();
@@ -489,6 +497,7 @@
                                  kMinActiveConnectionIdLimitTransportParam,
                                  quiche::kVarInt62MaxValue),
       max_datagram_frame_size(kMaxDatagramFrameSize),
+      reliable_stream_reset(false),
       initial_round_trip_time_us(kInitialRoundTripTime)
 // Important note: any new transport parameters must be added
 // to TransportParameters::AreValid, SerializeTransportParameters and
@@ -521,6 +530,7 @@
       initial_source_connection_id(other.initial_source_connection_id),
       retry_source_connection_id(other.retry_source_connection_id),
       max_datagram_frame_size(other.max_datagram_frame_size),
+      reliable_stream_reset(other.reliable_stream_reset),
       initial_round_trip_time_us(other.initial_round_trip_time_us),
       google_handshake_message(other.google_handshake_message),
       google_connection_options(other.google_connection_options),
@@ -561,6 +571,7 @@
         retry_source_connection_id == rhs.retry_source_connection_id &&
         max_datagram_frame_size.value() ==
             rhs.max_datagram_frame_size.value() &&
+        reliable_stream_reset == rhs.reliable_stream_reset &&
         initial_round_trip_time_us.value() ==
             rhs.initial_round_trip_time_us.value() &&
         google_handshake_message == rhs.google_handshake_message &&
@@ -749,6 +760,7 @@
       kConnectionIdParameterLength +      // initial_source_connection_id
       kConnectionIdParameterLength +      // retry_source_connection_id
       kIntegerParameterLength +           // max_datagram_frame_size
+      kTypeAndValueLength +               // reliable_stream_reset
       kIntegerParameterLength +           // initial_round_trip_time_us
       kTypeAndValueLength +               // google_handshake_message
       kTypeAndValueLength +               // google_connection_options
@@ -770,6 +782,7 @@
       TransportParameters::kMinAckDelay,
       TransportParameters::kActiveConnectionIdLimit,
       TransportParameters::kMaxDatagramFrameSize,
+      TransportParameters::kReliableStreamReset,
       TransportParameters::kGoogleHandshakeMessage,
       TransportParameters::kInitialRoundTripTime,
       TransportParameters::kDisableActiveMigration,
@@ -1021,6 +1034,18 @@
           }
         }
       } break;
+      // reliable_stream_reset
+      case TransportParameters::kReliableStreamReset: {
+        if (in.reliable_stream_reset) {
+          if (!writer.WriteVarInt62(
+                  TransportParameters::kReliableStreamReset) ||
+              !writer.WriteVarInt62(/* transport parameter length */ 0)) {
+            QUIC_BUG(Failed to write reliable_stream_reset)
+                << "Failed to write reliable_stream_reset for " << in;
+            return false;
+          }
+        }
+      } break;
       // preferred_address
       case TransportParameters::kPreferredAddress: {
         if (in.preferred_address) {
@@ -1438,6 +1463,13 @@
         parse_success =
             out->initial_round_trip_time_us.Read(&value_reader, error_details);
         break;
+      case TransportParameters::kReliableStreamReset:
+        if (out->reliable_stream_reset) {
+          *error_details = "Received a second reliable_stream_reset";
+          return false;
+        }
+        out->reliable_stream_reset = true;
+        break;
       case TransportParameters::kGoogleConnectionOptions: {
         if (out->google_connection_options.has_value()) {
           *error_details = "Received a second google_connection_options";
@@ -1612,8 +1644,11 @@
     return false;
   }
   uint8_t disable_active_migration = in.disable_active_migration ? 1 : 0;
+  uint8_t reliable_stream_reset = in.reliable_stream_reset ? 1 : 0;
   if (!EVP_DigestUpdate(hash_ctx.get(), &disable_active_migration,
                         sizeof(disable_active_migration)) ||
+      (reliable_stream_reset &&
+       !EVP_DigestUpdate(hash_ctx.get(), "ResetStreamAt", 13)) ||
       !EVP_DigestFinal(hash_ctx.get(), out->data() + 1, nullptr)) {
     QUIC_BUG(quic_bug_10743_29)
         << "Unexpected failure of EVP_Digest functions when hashing "
diff --git a/quiche/quic/core/crypto/transport_parameters.h b/quiche/quic/core/crypto/transport_parameters.h
index 496de82..9255793 100644
--- a/quiche/quic/core/crypto/transport_parameters.h
+++ b/quiche/quic/core/crypto/transport_parameters.h
@@ -248,6 +248,9 @@
   // the sender accepts. See draft-ietf-quic-datagram.
   IntegerParameter max_datagram_frame_size;
 
+  // Indicates support for the RESET_STREAM_AT frame.
+  bool reliable_stream_reset;
+
   // Google-specific transport parameter that carries an estimate of the
   // initial round-trip time in microseconds.
   IntegerParameter initial_round_trip_time_us;
diff --git a/quiche/quic/core/crypto/transport_parameters_test.cc b/quiche/quic/core/crypto/transport_parameters_test.cc
index b6d4b37..4fb1fed 100644
--- a/quiche/quic/core/crypto/transport_parameters_test.cc
+++ b/quiche/quic/core/crypto/transport_parameters_test.cc
@@ -38,6 +38,7 @@
 const uint64_t kFakeInitialMaxStreamsBidi = 21;
 const uint64_t kFakeInitialMaxStreamsUni = 22;
 const bool kFakeDisableMigration = true;
+const bool kFakeReliableStreamReset = true;
 const uint64_t kFakeInitialRoundTripTime = 53;
 const uint8_t kFakePreferredStatelessResetTokenData[16] = {
     0x80, 0x81, 0x82, 0x83, 0x84, 0x85, 0x86, 0x87,
@@ -191,6 +192,8 @@
   new_params.version_information = CreateFakeVersionInformation();
   orig_params.disable_active_migration = true;
   new_params.disable_active_migration = true;
+  orig_params.reliable_stream_reset = true;
+  new_params.reliable_stream_reset = true;
   EXPECT_EQ(orig_params, new_params);
   EXPECT_TRUE(orig_params == new_params);
   EXPECT_FALSE(orig_params != new_params);
@@ -285,6 +288,7 @@
   orig_params.max_ack_delay.set_value(kMaxAckDelayForTest);
   orig_params.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
   orig_params.disable_active_migration = kFakeDisableMigration;
+  orig_params.reliable_stream_reset = kFakeReliableStreamReset;
   orig_params.preferred_address = CreateFakePreferredAddress();
   orig_params.active_connection_id_limit.set_value(
       kActiveConnectionIdLimitForTest);
@@ -325,6 +329,7 @@
   orig_params.max_ack_delay.set_value(kMaxAckDelayForTest);
   orig_params.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
   orig_params.disable_active_migration = kFakeDisableMigration;
+  orig_params.reliable_stream_reset = kFakeReliableStreamReset;
   orig_params.active_connection_id_limit.set_value(
       kActiveConnectionIdLimitForTest);
   orig_params.initial_source_connection_id =
@@ -376,6 +381,7 @@
   orig_params.max_ack_delay.set_value(kMaxAckDelayForTest);
   orig_params.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
   orig_params.disable_active_migration = kFakeDisableMigration;
+  orig_params.reliable_stream_reset = kFakeReliableStreamReset;
   orig_params.preferred_address = CreateFakePreferredAddress();
   orig_params.active_connection_id_limit.set_value(
       kActiveConnectionIdLimitForTest);
@@ -556,6 +562,9 @@
       // disable_active_migration
       0x0c,  // parameter id
       0x00,  // length
+      // reliable_stream_reset
+      0xc0, 0x17, 0xf7, 0x58, 0x6d, 0x2c, 0xb5, 0x71,  // parameter id
+      0x00,  // length
       // active_connection_id_limit
       0x0e,  // parameter id
       0x01,  // length
@@ -631,6 +640,7 @@
   EXPECT_EQ(kMaxAckDelayForTest, new_params.max_ack_delay.value());
   EXPECT_EQ(kMinAckDelayUsForTest, new_params.min_ack_delay_us.value());
   EXPECT_EQ(kFakeDisableMigration, new_params.disable_active_migration);
+  EXPECT_EQ(kFakeReliableStreamReset, new_params.reliable_stream_reset);
   EXPECT_EQ(kActiveConnectionIdLimitForTest,
             new_params.active_connection_id_limit.value());
   ASSERT_TRUE(new_params.initial_source_connection_id.has_value());
@@ -803,6 +813,9 @@
       // disable_active_migration
       0x0c,  // parameter id
       0x00,  // length
+      // reliable_stream_reset
+      0xc0, 0x17, 0xf7, 0x58, 0x6d, 0x2c, 0xb5, 0x71,  // parameter id
+      0x00,  // length
       // preferred_address
       0x0d,  // parameter id
       0x31,  // length
@@ -897,6 +910,7 @@
   EXPECT_EQ(kMaxAckDelayForTest, new_params.max_ack_delay.value());
   EXPECT_EQ(kMinAckDelayUsForTest, new_params.min_ack_delay_us.value());
   EXPECT_EQ(kFakeDisableMigration, new_params.disable_active_migration);
+  EXPECT_EQ(kFakeReliableStreamReset, new_params.reliable_stream_reset);
   ASSERT_NE(nullptr, new_params.preferred_address.get());
   EXPECT_EQ(CreateFakeV4SocketAddress(),
             new_params.preferred_address->ipv4_socket_address);
@@ -1028,6 +1042,7 @@
   orig_params.max_ack_delay.set_value(kMaxAckDelayForTest);
   orig_params.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
   orig_params.disable_active_migration = kFakeDisableMigration;
+  orig_params.reliable_stream_reset = kFakeReliableStreamReset;
   orig_params.active_connection_id_limit.set_value(
       kActiveConnectionIdLimitForTest);
   orig_params.initial_source_connection_id =
@@ -1072,6 +1087,7 @@
   orig_params.max_ack_delay.set_value(kMaxAckDelayForTest);
   orig_params.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
   orig_params.disable_active_migration = kFakeDisableMigration;
+  orig_params.reliable_stream_reset = kFakeReliableStreamReset;
   orig_params.active_connection_id_limit.set_value(
       kActiveConnectionIdLimitForTest);
   orig_params.initial_source_connection_id =
@@ -1130,6 +1146,7 @@
     original_params_.max_ack_delay.set_value(kMaxAckDelayForTest);
     original_params_.min_ack_delay_us.set_value(kMinAckDelayUsForTest);
     original_params_.disable_active_migration = kFakeDisableMigration;
+    original_params_.reliable_stream_reset = kFakeReliableStreamReset;
     original_params_.preferred_address = CreateFakePreferredAddress();
     original_params_.active_connection_id_limit.set_value(
         kActiveConnectionIdLimitForTest);
diff --git a/quiche/quic/core/quic_config.cc b/quiche/quic/core/quic_config.cc
index 480afcb..47027c5 100644
--- a/quiche/quic/core/quic_config.cc
+++ b/quiche/quic/core/quic_config.cc
@@ -1034,6 +1034,7 @@
   SetAckDelayExponentToSend(kDefaultAckDelayExponent);
   SetMaxPacketSizeToSend(kMaxIncomingPacketSize);
   SetMaxDatagramFrameSizeToSend(kMaxAcceptedDatagramFrameSize);
+  SetReliableStreamReset(false);
 }
 
 void QuicConfig::ToHandshakeMessage(
@@ -1278,6 +1279,8 @@
     params->google_handshake_message = google_handshake_message_to_send_;
   }
 
+  params->reliable_stream_reset = reliable_stream_reset_;
+
   params->custom_parameters = custom_transport_parameters_to_send_;
 
   return true;
@@ -1413,6 +1416,10 @@
 
   received_custom_transport_parameters_ = params.custom_parameters;
 
+  if (reliable_stream_reset_) {
+    reliable_stream_reset_ = params.reliable_stream_reset;
+  }
+
   if (!is_resumption) {
     negotiated_ = true;
   }
@@ -1481,4 +1488,12 @@
          GetQuicFlag(quic_always_support_server_preferred_address);
 }
 
+void QuicConfig::SetReliableStreamReset(bool reliable_stream_reset) {
+  reliable_stream_reset_ = reliable_stream_reset;
+}
+
+bool QuicConfig::SupportsReliableStreamReset() const {
+  return reliable_stream_reset_;
+}
+
 }  // namespace quic
diff --git a/quiche/quic/core/quic_config.h b/quiche/quic/core/quic_config.h
index 34c143c..dd72974 100644
--- a/quiche/quic/core/quic_config.h
+++ b/quiche/quic/core/quic_config.h
@@ -441,6 +441,10 @@
   // quic_always_support_server_preferred_address.
   bool SupportsServerPreferredAddress(Perspective perspective) const;
 
+  // Returns true if this config supports reliable stream reset.
+  void SetReliableStreamReset(bool reliable_stream_reset);
+  bool SupportsReliableStreamReset() const;
+
   // Original destination connection ID.
   void SetOriginalConnectionIdToSend(
       const QuicConnectionId& original_destination_connection_id);
@@ -706,6 +710,9 @@
   // Google internal handshake message.
   std::optional<std::string> google_handshake_message_to_send_;
   std::optional<std::string> received_google_handshake_message_;
+
+  // Support for RESET_STREAM_AT frame.
+  bool reliable_stream_reset_;
 };
 
 }  // namespace quic
diff --git a/quiche/quic/core/quic_config_test.cc b/quiche/quic/core/quic_config_test.cc
index 4cf30e9..c706285 100644
--- a/quiche/quic/core/quic_config_test.cc
+++ b/quiche/quic/core/quic_config_test.cc
@@ -476,6 +476,7 @@
   config_.SetRetrySourceConnectionIdToSend(TestConnectionId(0x3333));
   config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
   config_.SetGoogleHandshakeMessageToSend(kFakeGoogleHandshakeMessage);
+  config_.SetReliableStreamReset(true);
 
   QuicIpAddress host;
   host.FromString("127.0.3.1");
@@ -536,6 +537,8 @@
                 &params.preferred_address->stateless_reset_token.front()),
             new_stateless_reset_token);
   EXPECT_EQ(kFakeGoogleHandshakeMessage, params.google_handshake_message);
+
+  EXPECT_TRUE(params.reliable_stream_reset);
 }
 
 TEST_P(QuicConfigTest, DNATPreferredAddress) {
diff --git a/quiche/quic/core/quic_connection.cc b/quiche/quic/core/quic_connection.cc
index 1251c49..8ab8459 100644
--- a/quiche/quic/core/quic_connection.cc
+++ b/quiche/quic/core/quic_connection.cc
@@ -32,6 +32,7 @@
 #include "quiche/quic/core/crypto/crypto_utils.h"
 #include "quiche/quic/core/crypto/quic_decrypter.h"
 #include "quiche/quic/core/crypto/quic_encrypter.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/quic_bandwidth.h"
 #include "quiche/quic/core/quic_config.h"
 #include "quiche/quic/core/quic_connection_id.h"
@@ -578,6 +579,9 @@
       multi_port_migration_enabled_ = true;
     }
   }
+
+  reliable_stream_reset_ = config.SupportsReliableStreamReset();
+  framer_.set_process_reset_stream_at(reliable_stream_reset_);
 }
 
 void QuicConnection::AddDispatcherSentPackets(
@@ -2066,10 +2070,15 @@
   if (!UpdatePacketContent(RESET_STREAM_AT_FRAME)) {
     return false;
   }
-
-  // TODO(b/278878322): implement.
+  if (!reliable_stream_reset_) {
+    CloseConnection(IETF_QUIC_PROTOCOL_VIOLATION,
+                    "Received RESET_STREAM_AT while not negotiated.",
+                    ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return false;
+  }
 
   MaybeUpdateAckTimeout();
+  visitor_->OnResetStreamAt(frame);
   return true;
 }
 
diff --git a/quiche/quic/core/quic_connection.h b/quiche/quic/core/quic_connection.h
index b31e7d0..c76527b 100644
--- a/quiche/quic/core/quic_connection.h
+++ b/quiche/quic/core/quic_connection.h
@@ -35,6 +35,7 @@
 #include "quiche/quic/core/frames/quic_max_streams_frame.h"
 #include "quiche/quic/core/frames/quic_new_connection_id_frame.h"
 #include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
+#include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_alarm.h"
 #include "quiche/quic/core/quic_alarm_factory.h"
 #include "quiche/quic/core/quic_blocked_writer_interface.h"
@@ -108,6 +109,7 @@
 
   // Called when the stream is reset by the peer.
   virtual void OnRstStream(const QuicRstStreamFrame& frame) = 0;
+  virtual void OnResetStreamAt(const QuicResetStreamAtFrame& frame) = 0;
 
   // Called when the connection is going away according to the peer.
   virtual void OnGoAway(const QuicGoAwayFrame& frame) = 0;
@@ -2517,6 +2519,9 @@
   // might be different from the next codepoint in per_packet_options_.
   QuicEcnCodepoint last_ecn_codepoint_sent_ = ECN_NOT_ECT;
 
+  // If true, the peer has indicated that it supports the RESET_STREAM_AT frame.
+  bool reliable_stream_reset_ = false;
+
   const bool quic_limit_new_streams_per_loop_2_ =
       GetQuicReloadableFlag(quic_limit_new_streams_per_loop_2);
 
diff --git a/quiche/quic/core/quic_connection_test.cc b/quiche/quic/core/quic_connection_test.cc
index b4f67c5..a79d5ac 100644
--- a/quiche/quic/core/quic_connection_test.cc
+++ b/quiche/quic/core/quic_connection_test.cc
@@ -26,6 +26,7 @@
 #include "quiche/quic/core/frames/quic_connection_close_frame.h"
 #include "quiche/quic/core/frames/quic_new_connection_id_frame.h"
 #include "quiche/quic/core/frames/quic_path_response_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_rst_stream_frame.h"
 #include "quiche/quic/core/quic_connection_id.h"
 #include "quiche/quic/core/quic_constants.h"
@@ -15460,6 +15461,11 @@
   if (!version().HasIetfQuicFrames()) {
     return;
   }
+  EXPECT_CALL(*send_algorithm_, SetFromConfig(_, _));
+  QuicConfig config;
+  config.SetReliableStreamReset(true);
+  connection_.SetFromConfig(config);
+
   EXPECT_CALL(connection_id_generator_,
               GenerateNextConnectionId(TestConnectionId(12)))
       .WillOnce(Return(TestConnectionId(456)));
@@ -15468,6 +15474,7 @@
       .WillOnce(Return(TestConnectionId(789)));
   EXPECT_CALL(visitor_, SendNewConnectionId(_)).Times(2);
   EXPECT_CALL(visitor_, OnRstStream(_));
+  EXPECT_CALL(visitor_, OnResetStreamAt(_));
   EXPECT_CALL(visitor_, OnWindowUpdateFrame(_));
   EXPECT_CALL(visitor_, OnBlockedFrame(_));
   EXPECT_CALL(visitor_, OnHandshakeDoneReceived());
@@ -17557,6 +17564,36 @@
   EXPECT_EQ(connection_.ecn_codepoint(), ECN_NOT_ECT);
 }
 
+TEST_P(QuicConnectionTest, RejectResetStreamAtIfNotNegotiated) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  EXPECT_CALL(*send_algorithm_, SetFromConfig(_, _));
+  QuicConfig config;
+  config.SetReliableStreamReset(false);
+  connection_.SetFromConfig(config);
+  connection_.SetDefaultEncryptionLevel(ENCRYPTION_FORWARD_SECURE);
+
+  EXPECT_CALL(visitor_, OnConnectionClosed(_, _)).Times(1);
+  connection_.OnResetStreamAtFrame(QuicResetStreamAtFrame());
+}
+
+TEST_P(QuicConnectionTest, ResetStreamAt) {
+  if (!version().HasIetfQuicFrames()) {
+    return;
+  }
+  EXPECT_CALL(*send_algorithm_, SetFromConfig(_, _));
+  QuicConfig config;
+  config.SetReliableStreamReset(true);
+  connection_.SetFromConfig(config);
+  connection_.SetDefaultEncryptionLevel(ENCRYPTION_FORWARD_SECURE);
+
+  EXPECT_CALL(visitor_, OnResetStreamAt(QuicResetStreamAtFrame(
+                            0, 0, QUIC_STREAM_NO_ERROR, 20, 10)))
+      .Times(1);
+  connection_.OnResetStreamAtFrame(QuicResetStreamAtFrame(0, 0, 0, 20, 10));
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quiche/quic/core/quic_session.cc b/quiche/quic/core/quic_session.cc
index bba06b9..66d8935 100644
--- a/quiche/quic/core/quic_session.cc
+++ b/quiche/quic/core/quic_session.cc
@@ -19,11 +19,13 @@
 #include "absl/strings/str_cat.h"
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_window_update_frame.h"
 #include "quiche/quic/core/quic_connection.h"
 #include "quiche/quic/core/quic_connection_context.h"
 #include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_flow_controller.h"
+#include "quiche/quic/core/quic_stream.h"
 #include "quiche/quic/core/quic_stream_priority.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/quic_utils.h"
@@ -439,6 +441,21 @@
   ClosePendingStream(stream_id);
 }
 
+void QuicSession::PendingStreamOnResetStreamAt(
+    const QuicResetStreamAtFrame& frame) {
+  QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
+  QuicStreamId stream_id = frame.stream_id;
+
+  PendingStream* pending = GetOrCreatePendingStream(stream_id);
+
+  if (!pending) {
+    HandleRstOnValidNonexistentStream(frame.ToRstStream());
+    return;
+  }
+
+  pending->OnResetStreamAtFrame(frame);
+}
+
 void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
   QuicStreamId stream_id = frame.stream_id;
   if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
@@ -476,6 +493,40 @@
   stream->OnStreamReset(frame);
 }
 
+void QuicSession::OnResetStreamAt(const QuicResetStreamAtFrame& frame) {
+  QUICHE_DCHECK(VersionHasIetfQuicFrames(transport_version()));
+  QuicStreamId stream_id = frame.stream_id;
+  if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
+
+  if (VersionHasIetfQuicFrames(transport_version()) &&
+      QuicUtils::GetStreamType(stream_id, perspective(),
+                               IsIncomingStream(stream_id),
+                               version()) == WRITE_UNIDIRECTIONAL) {
+    connection()->CloseConnection(
+        QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
+        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+    return;
+  }
+
+  if (ShouldProcessFrameByPendingStream(RESET_STREAM_AT_FRAME, stream_id)) {
+    PendingStreamOnResetStreamAt(frame);
+    return;
+  }
+
+  QuicStream* stream = GetOrCreateStream(stream_id);
+
+  if (!stream) {
+    HandleRstOnValidNonexistentStream(frame.ToRstStream());
+    return;  // Errors are handled by GetOrCreateStream.
+  }
+  stream->OnResetStreamAtFrame(frame);
+}
+
 void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
   QUIC_BUG_IF(quic_bug_12435_1, version().UsesHttp3())
       << "gQUIC GOAWAY received on version " << version();
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h
index 1b94b70..8be519a 100644
--- a/quiche/quic/core/quic_session.h
+++ b/quiche/quic/core/quic_session.h
@@ -20,6 +20,7 @@
 #include "absl/types/span.h"
 #include "quiche/quic/core/crypto/tls_connection.h"
 #include "quiche/quic/core/frames/quic_ack_frequency_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
 #include "quiche/quic/core/frames/quic_stop_sending_frame.h"
 #include "quiche/quic/core/frames/quic_window_update_frame.h"
 #include "quiche/quic/core/handshaker_delegate_interface.h"
@@ -136,6 +137,7 @@
   void OnStreamFrame(const QuicStreamFrame& frame) override;
   void OnCryptoFrame(const QuicCryptoFrame& frame) override;
   void OnRstStream(const QuicRstStreamFrame& frame) override;
+  void OnResetStreamAt(const QuicResetStreamAtFrame& frame) override;
   void OnGoAway(const QuicGoAwayFrame& frame) override;
   void OnMessageReceived(absl::string_view message) override;
   void OnHandshakeDoneReceived() override;
@@ -957,6 +959,7 @@
   // Creates or gets pending strea, feed it with |frame|, and closes the pending
   // stream.
   void PendingStreamOnRstStream(const QuicRstStreamFrame& frame);
+  void PendingStreamOnResetStreamAt(const QuicResetStreamAtFrame& frame);
 
   // Creates or gets pending stream, feeds it with |frame|, and records the
   // max_data in the pending stream.
diff --git a/quiche/quic/core/quic_session_test.cc b/quiche/quic/core/quic_session_test.cc
index 03c83e4..7f54074 100644
--- a/quiche/quic/core/quic_session_test.cc
+++ b/quiche/quic/core/quic_session_test.cc
@@ -21,8 +21,11 @@
 #include "quiche/quic/core/crypto/null_encrypter.h"
 #include "quiche/quic/core/crypto/transport_parameters.h"
 #include "quiche/quic/core/frames/quic_max_streams_frame.h"
+#include "quiche/quic/core/frames/quic_reset_stream_at_frame.h"
+#include "quiche/quic/core/quic_constants.h"
 #include "quiche/quic/core/quic_crypto_stream.h"
 #include "quiche/quic/core/quic_data_writer.h"
+#include "quiche/quic/core/quic_error_codes.h"
 #include "quiche/quic/core/quic_packets.h"
 #include "quiche/quic/core/quic_stream.h"
 #include "quiche/quic/core/quic_types.h"
@@ -1609,6 +1612,23 @@
   session_.OnRstStream(rst1);
 }
 
+TEST_P(QuicSessionTestServer, OnResetStreamAtInvalidStreamId) {
+  if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
+    // This test requires IETF QUIC.
+    return;
+  }
+  // Send two bytes of payload.
+  QuicResetStreamAtFrame rst1(
+      kInvalidControlFrameId,
+      QuicUtils::GetInvalidStreamId(connection_->transport_version()),
+      QUIC_ERROR_PROCESSING_STREAM, 10, 0);
+  EXPECT_CALL(*connection_,
+              CloseConnection(
+                  QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
+                  ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET));
+  session_.OnResetStreamAt(rst1);
+}
+
 TEST_P(QuicSessionTestServer, HandshakeUnblocksFlowControlBlockedStream) {
   if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
     // This test requires Google QUIC crypto because it assumes streams start