Skips sending queued frames for streams that have been reset.

PiperOrigin-RevId: 421406008
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 4e010b1..f177d5f 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -3980,6 +3980,75 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+TEST(NgHttp2AdapterTest, SkipsSendingFramesForRejectedStream) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string initial_frames =
+      TestFrameSequence()
+          .ClientPreface()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(static_cast<size_t>(initial_result), initial_frames.size());
+
+  auto body = absl::make_unique<TestDataFrameSource>(visitor, true);
+  body->AppendPayload("Here is some data, which will be completely ignored!");
+
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+
+  adapter->SubmitWindowUpdate(1, 1024);
+  adapter->SubmitRst(1, Http2ErrorCode::INTERNAL_ERROR);
+
+  // Server initial SETTINGS and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  // nghttp2 apparently allows extension frames to be sent on reset streams.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  // The server sends a RST_STREAM for the offending stream.
+  // The response HEADERS, DATA and WINDOW_UPDATE are all ignored.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 1, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::INTERNAL_ERROR)));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::INTERNAL_ERROR));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(
+      visitor.data(),
+      EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    spdy::SpdyFrameType::RST_STREAM}));
+}
+
 TEST(NgHttp2AdapterTest, ServerStartsShutdown) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index e9b863c..de3a3e2 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -2258,25 +2258,33 @@
       {{HEADER_TABLE_SIZE, 128}, {MAX_FRAME_SIZE, 128 << 10}});
   EXPECT_TRUE(adapter_->want_write());
 
-  adapter_->SubmitPriorityForStream(3, 1, 255, true);
-  adapter_->SubmitRst(3, Http2ErrorCode::CANCEL);
+  const Http2StreamId accepted_stream = 3;
+  const Http2StreamId rejected_stream = 7;
+  adapter_->SubmitPriorityForStream(accepted_stream, 1, 255, true);
+  adapter_->SubmitRst(rejected_stream, Http2ErrorCode::CANCEL);
   adapter_->SubmitPing(42);
   adapter_->SubmitGoAway(13, Http2ErrorCode::HTTP2_NO_ERROR, "");
-  adapter_->SubmitWindowUpdate(3, 127);
+  adapter_->SubmitWindowUpdate(accepted_stream, 127);
   EXPECT_TRUE(adapter_->want_write());
 
   EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
   EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
-  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(PRIORITY, 3, _, 0x0));
-  EXPECT_CALL(http2_visitor_, OnFrameSent(PRIORITY, 3, _, 0x0, 0));
-  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(RST_STREAM, 3, _, 0x0));
-  EXPECT_CALL(http2_visitor_, OnFrameSent(RST_STREAM, 3, _, 0x0, 0x8));
+  EXPECT_CALL(http2_visitor_,
+              OnBeforeFrameSent(PRIORITY, accepted_stream, _, 0x0));
+  EXPECT_CALL(http2_visitor_,
+              OnFrameSent(PRIORITY, accepted_stream, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_,
+              OnBeforeFrameSent(RST_STREAM, rejected_stream, _, 0x0));
+  EXPECT_CALL(http2_visitor_,
+              OnFrameSent(RST_STREAM, rejected_stream, _, 0x0, 0x8));
   EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(PING, 0, _, 0x0));
   EXPECT_CALL(http2_visitor_, OnFrameSent(PING, 0, _, 0x0, 0));
   EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
   EXPECT_CALL(http2_visitor_, OnFrameSent(GOAWAY, 0, _, 0x0, 0));
-  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(WINDOW_UPDATE, 3, _, 0x0));
-  EXPECT_CALL(http2_visitor_, OnFrameSent(WINDOW_UPDATE, 3, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_,
+              OnBeforeFrameSent(WINDOW_UPDATE, accepted_stream, _, 0x0));
+  EXPECT_CALL(http2_visitor_,
+              OnFrameSent(WINDOW_UPDATE, accepted_stream, _, 0x0, 0));
 
   int result = adapter_->Send();
   EXPECT_EQ(0, result);
@@ -4060,6 +4068,72 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+TEST(OgHttp2AdapterServerTest, SkipsSendingFramesForRejectedStream) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string initial_frames =
+      TestFrameSequence()
+          .ClientPreface()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(static_cast<size_t>(initial_result), initial_frames.size());
+
+  auto body = absl::make_unique<TestDataFrameSource>(visitor, true);
+  body->AppendPayload("Here is some data, which will be completely ignored!");
+
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+
+  adapter->SubmitWindowUpdate(1, 1024);
+  adapter->SubmitRst(1, Http2ErrorCode::INTERNAL_ERROR);
+
+  // Server initial SETTINGS and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  // The server sends a RST_STREAM for the offending stream.
+  // The response HEADERS, DATA and WINDOW_UPDATE are all ignored.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 1, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::INTERNAL_ERROR)));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::RST_STREAM}));
+}
+
 TEST(OgHttpAdapterServerTest, ServerStartsShutdown) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index bc8100f..94b728d 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -520,9 +520,6 @@
     }
     if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
       streams_reset_.insert(frame->stream_id());
-    } else if (iter != stream_map_.end()) {
-      // Enqueue RST_STREAM NO_ERROR if appropriate.
-      r.emplace([this, iter]() { MaybeFinWithRstStream(iter); });
     }
   }
   if (frame->stream_id() != 0) {
@@ -637,6 +634,17 @@
     // DATA frames should never be queued.
     QUICHE_DCHECK_NE(c.frame_type(), 0);
 
+    const bool stream_reset =
+        c.stream_id() != 0 && streams_reset_.count(c.stream_id()) > 0;
+    if (stream_reset &&
+        c.frame_type() != static_cast<uint8_t>(FrameType::RST_STREAM)) {
+      // The stream has been reset, so any other remaining frames can be
+      // skipped.
+      // TODO(birenroy): inform the visitor of frames that are skipped.
+      DecrementQueuedFrameCount(c.stream_id(), c.frame_type());
+      frames_.pop_front();
+      continue;
+    }
     // Frames can't accurately report their own length; the actual serialized
     // length must be used instead.
     spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frame_ptr);
@@ -695,14 +703,28 @@
     }
     return true;
   }
-  auto iter = queued_frames_.find(stream_id);
-  if (frame_type != FrameType::DATA) {
-    --iter->second;
+
+  const bool contains_fin =
+      (frame_type == FrameType::DATA || frame_type == FrameType::HEADERS) &&
+      (flags & 0x01) == 0x01;
+  auto it = stream_map_.find(stream_id);
+  const bool still_open_remote =
+      it != stream_map_.end() && !it->second.half_closed_remote;
+  if (contains_fin && still_open_remote &&
+      options_.rst_stream_no_error_when_incomplete &&
+      options_.perspective == Perspective::kServer) {
+    // Since the peer has not yet ended the stream, this endpoint should
+    // send a RST_STREAM NO_ERROR. See RFC 7540 Section 8.1.
+    frames_.push_front(absl::make_unique<spdy::SpdyRstStreamIR>(
+        stream_id, spdy::SpdyErrorCode::ERROR_CODE_NO_ERROR));
+    auto queued_result = queued_frames_.insert({stream_id, 1});
+    if (!queued_result.second) {
+      ++(queued_result.first->second);
+    }
+    it->second.half_closed_remote = true;
   }
-  if (iter->second == 0) {
-    // TODO(birenroy): Consider passing through `error_code` here.
-    CloseStreamIfReady(frame_type_int, stream_id);
-  }
+
+  DecrementQueuedFrameCount(stream_id, frame_type_int);
   return true;
 }
 
@@ -715,6 +737,15 @@
     return SendResult::SEND_OK;
   }
   StreamState& state = it->second;
+  auto reset_it = streams_reset_.find(stream_id);
+  if (reset_it != streams_reset_.end()) {
+    // The stream has been reset; there's no point in sending DATA or trailing
+    // HEADERS.
+    state.outbound_body = nullptr;
+    state.trailers = nullptr;
+    state.outbound_metadata.clear();
+    return SendResult::SEND_OK;
+  }
   SendResult connection_can_write = SendResult::SEND_OK;
   if (!state.outbound_metadata.empty()) {
     connection_can_write = SendMetadata(stream_id, state.outbound_metadata);
@@ -1604,5 +1635,22 @@
   }
 }
 
+void OgHttp2Session::DecrementQueuedFrameCount(uint32_t stream_id,
+                                               uint8_t frame_type) {
+  auto iter = queued_frames_.find(stream_id);
+  if (iter == queued_frames_.end()) {
+    QUICHE_LOG(ERROR) << "Unable to find a queued frame count for stream "
+                      << stream_id;
+    return;
+  }
+  if (static_cast<FrameType>(frame_type) != FrameType::DATA) {
+    --iter->second;
+  }
+  if (iter->second == 0) {
+    // TODO(birenroy): Consider passing through `error_code` here.
+    CloseStreamIfReady(frame_type, stream_id);
+  }
+}
+
 }  // namespace adapter
 }  // namespace http2
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index eaef9bd..17919f7 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -383,6 +383,8 @@
   // Handles the potential end of received metadata for the given `stream_id`.
   void MaybeHandleMetadataEndForStream(Http2StreamId stream_id);
 
+  void DecrementQueuedFrameCount(uint32_t stream_id, uint8_t frame_type);
+
   // Receives events when inbound frames are parsed.
   Http2VisitorInterface& visitor_;