Avoid invoking visitor callbacks for a stream once it has been marked for reset.

This CL causes OgHttp2Session to check its marked-for-RST_STREAM streams set
before delivering callbacks to the visitor for receive-level events (under
ProcessBytes()). This change better aligns oghttp2 with nghttp2 behavior and
will be helpful for content-length validation.

Note that nghttp2 still delivers control frames and extension frames to the
visitor. For oghttp2, this CL tentatively unifies control frames, METADATA
frames, and DATA frames under "do not deliver" after the stream has been marked
for reset.

This CL also helps with Envoy integration testing as mentioned by ++birenroy@:
  - Before: http://screen/BSFjdLUB5dtueKU
  - After: http://screen/8pr9g596Ph2ewtK

PiperOrigin-RevId: 421834399
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index f177d5f..8cad0c2 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -2464,6 +2464,101 @@
                                             spdy::SpdyFrameType::RST_STREAM}));
 }
 
+TEST(NgHttp2AdapterTest, ServerErrorWhileHandlingHeadersDropsFrames) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"},
+                                           {"accept", "some bogus value!"}},
+                                          /*fin=*/false)
+                                 .WindowUpdate(1, 2000)
+                                 .Data(1, "This is the request body.")
+                                 .Metadata(1, "This is the request metadata.")
+                                 .RstStream(1, Http2ErrorCode::CANCEL)
+                                 .WindowUpdate(0, 2000)
+                                 .Headers(3,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/two"}},
+                                          /*fin=*/false)
+                                 .Metadata(3, "This is the request metadata.",
+                                           /*multiple_frames=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnHeaderForStream(1, "accept", "some bogus value!"))
+      .WillOnce(testing::Return(Http2VisitorInterface::HEADER_RST_STREAM));
+  // For the RST_STREAM-marked stream, the control frames and METADATA frame but
+  // not the DATA frame are delivered to the visitor.
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, kMetadataFrameType, 4));
+  EXPECT_CALL(visitor, OnBeginMetadataForStream(1, _));
+  EXPECT_CALL(visitor, OnMetadataForStream(1, _));
+  EXPECT_CALL(visitor, OnMetadataEndForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, RST_STREAM, 0));
+  EXPECT_CALL(visitor, OnRstStream(1, Http2ErrorCode::CANCEL));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::CANCEL));
+  EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, kMetadataFrameType, 0));
+  EXPECT_CALL(visitor, OnBeginMetadataForStream(3, _));
+  EXPECT_CALL(visitor, OnMetadataForStream(3, "This is the re"))
+      .WillOnce(testing::DoAll(testing::InvokeWithoutArgs([&adapter]() {
+                                 adapter->SubmitRst(
+                                     3, Http2ErrorCode::REFUSED_STREAM);
+                               }),
+                               testing::Return(true)));
+  // The rest of the metadata is still delivered to the visitor.
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, kMetadataFrameType, 4));
+  EXPECT_CALL(visitor, OnBeginMetadataForStream(3, _));
+  EXPECT_CALL(visitor, OnMetadataForStream(3, "quest metadata."));
+  EXPECT_CALL(visitor, OnMetadataEndForStream(3));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, 4, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 1, 4, 0x0,
+                          static_cast<int>(Http2ErrorCode::INTERNAL_ERROR)));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, 4, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 3, 4, 0x0,
+                          static_cast<int>(Http2ErrorCode::REFUSED_STREAM)));
+  EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::REFUSED_STREAM));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::RST_STREAM,
+                                            spdy::SpdyFrameType::RST_STREAM}));
+}
+
 TEST(NgHttp2AdapterTest, ServerConnectionErrorWhileHandlingHeaders) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index de3a3e2..f9434dd 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -321,8 +321,6 @@
   EXPECT_CALL(visitor,
               OnInvalidFrame(
                   1, Http2VisitorInterface::InvalidFrameError::kHttpMessaging));
-  // NOTE: nghttp2 does not deliver the OnEndStream event.
-  EXPECT_CALL(visitor, OnEndStream(1));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
@@ -384,10 +382,6 @@
   EXPECT_CALL(
       visitor,
       OnInvalidFrame(1, Http2VisitorInterface::InvalidFrameError::kHttpHeader));
-  // TODO(b/210728715): Stop delivering events for streams with errors.
-  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
-  EXPECT_CALL(visitor, OnBeginDataForStream(1, 2));
-  EXPECT_CALL(visitor, OnDataForStream(1, "hi"));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
@@ -891,9 +885,6 @@
             adapter->SubmitRst(1, Http2ErrorCode::REFUSED_STREAM);
           }),
           testing::Return(Http2VisitorInterface::HEADER_RST_STREAM)));
-  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
-  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
-  EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
@@ -3111,11 +3102,7 @@
   EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/this/is/request/one"));
   EXPECT_CALL(visitor, OnHeaderForStream(1, "accept", "some bogus value!"))
       .WillOnce(testing::Return(Http2VisitorInterface::HEADER_RST_STREAM));
-  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
-  EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
-  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
-  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
-  EXPECT_CALL(visitor, OnDataForStream(1, "This is the request body."));
+  // Stream WINDOW_UPDATE and DATA frames are not delivered to the visitor.
   EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
   EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
 
@@ -3143,6 +3130,94 @@
                                             spdy::SpdyFrameType::RST_STREAM}));
 }
 
+TEST(OgHttp2AdapterServerTest, ServerErrorWhileHandlingHeadersDropsFrames) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"},
+                                           {"accept", "some bogus value!"}},
+                                          /*fin=*/false)
+                                 .WindowUpdate(1, 2000)
+                                 .Data(1, "This is the request body.")
+                                 .Metadata(1, "This is the request metadata.")
+                                 .RstStream(1, Http2ErrorCode::CANCEL)
+                                 .WindowUpdate(0, 2000)
+                                 .Headers(3,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/two"}},
+                                          /*fin=*/false)
+                                 .Metadata(3, "This is the request metadata.",
+                                           /*multiple_frames=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnHeaderForStream(1, "accept", "some bogus value!"))
+      .WillOnce(testing::Return(Http2VisitorInterface::HEADER_RST_STREAM));
+  // Frames for the RST_STREAM-marked stream are not delivered to the visitor.
+  // Note: nghttp2 still delivers control frames and metadata for the stream.
+  EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, kMetadataFrameType, 0));
+  EXPECT_CALL(visitor, OnBeginMetadataForStream(3, _));
+  EXPECT_CALL(visitor, OnMetadataForStream(3, "This is the re"))
+      .WillOnce(testing::DoAll(testing::InvokeWithoutArgs([&adapter]() {
+                                 adapter->SubmitRst(
+                                     3, Http2ErrorCode::REFUSED_STREAM);
+                               }),
+                               testing::Return(true)));
+  // The rest of the metadata is not delivered to the visitor.
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, 4, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 1, 4, 0x0,
+                          static_cast<int>(Http2ErrorCode::INTERNAL_ERROR)));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, 4, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 3, 4, 0x0,
+                          static_cast<int>(Http2ErrorCode::REFUSED_STREAM)));
+  EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::HTTP2_NO_ERROR));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::RST_STREAM,
+                                            spdy::SpdyFrameType::RST_STREAM}));
+}
+
 TEST(OgHttp2AdapterServerTest, ServerConnectionErrorWhileHandlingHeaders) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
@@ -3943,7 +4018,6 @@
   EXPECT_CALL(visitor,
               OnInvalidFrame(
                   1, Http2VisitorInterface::InvalidFrameError::kHttpMessaging));
-  EXPECT_CALL(visitor, OnEndStream(1));
 
   int64_t stream_result = adapter->ProcessBytes(stream1_frames);
   EXPECT_EQ(static_cast<size_t>(stream_result), stream1_frames.size());
@@ -3993,7 +4067,6 @@
   EXPECT_CALL(visitor,
               OnInvalidFrame(
                   3, Http2VisitorInterface::InvalidFrameError::kHttpMessaging));
-  EXPECT_CALL(visitor, OnEndStream(3));
 
   stream_result = adapter->ProcessBytes(stream3_frames);
   EXPECT_EQ(static_cast<size_t>(stream_result), stream3_frames.size());
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 94b728d..6e17f5c 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -519,6 +519,7 @@
       iter->second.half_closed_local = true;
     }
     if (frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
+      // TODO(diannahu): Condition on existence in the stream map?
       streams_reset_.insert(frame->stream_id());
     }
   }
@@ -990,6 +991,9 @@
                                     uint8_t flags) {
   highest_received_stream_id_ = std::max(static_cast<Http2StreamId>(stream_id),
                                          highest_received_stream_id_);
+  if (streams_reset_.contains(stream_id)) {
+    return;
+  }
   const bool result = visitor_.OnFrameHeader(stream_id, length, type, flags);
   if (!result) {
     fatal_visitor_callback_failure_ = true;
@@ -999,7 +1003,7 @@
 
 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
                                        size_t length, bool /*fin*/) {
-  if (!stream_map_.contains(stream_id)) {
+  if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
     // The stream does not exist; it could be an error or a benign close, e.g.,
     // getting data for a stream this connection recently closed.
     if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
@@ -1023,7 +1027,7 @@
   // Count the data against flow control, even if the stream is unknown.
   MarkDataBuffered(stream_id, len);
 
-  if (!stream_map_.contains(stream_id)) {
+  if (!stream_map_.contains(stream_id) || streams_reset_.contains(stream_id)) {
     // If the stream was unknown due to a protocol error, the visitor was
     // informed in OnDataFrameHeader().
     return;
@@ -1041,7 +1045,9 @@
   auto iter = stream_map_.find(stream_id);
   if (iter != stream_map_.end()) {
     iter->second.half_closed_remote = true;
-    visitor_.OnEndStream(stream_id);
+    if (!streams_reset_.contains(stream_id)) {
+      visitor_.OnEndStream(stream_id);
+    }
   }
   auto queued_frames_iter = queued_frames_.find(stream_id);
   const bool no_queued_frames = queued_frames_iter == queued_frames_.end() ||
@@ -1108,6 +1114,9 @@
                         ConnectionError::kWrongFrameSequence);
     return;
   }
+  if (streams_reset_.contains(stream_id)) {
+    return;
+  }
   visitor_.OnRstStream(stream_id, TranslateErrorCode(error_code));
   // TODO(birenroy): Consider whether there are outbound frames queued for the
   // stream.
@@ -1260,6 +1269,9 @@
         return;
       }
     } else {
+      if (streams_reset_.contains(stream_id)) {
+        return;
+      }
       if (it->second.send_window == 0) {
         // The stream was blocked on flow control.
         QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
@@ -1345,6 +1357,9 @@
 
 bool OgHttp2Session::OnFrameHeader(spdy::SpdyStreamId stream_id, size_t length,
                                    uint8_t type, uint8_t flags) {
+  if (streams_reset_.contains(stream_id)) {
+    return false;
+  }
   if (type == kMetadataFrameType) {
     QUICHE_DCHECK_EQ(metadata_length_, 0u);
     visitor_.OnBeginMetadataForStream(stream_id, length);
@@ -1365,6 +1380,9 @@
 }
 
 void OgHttp2Session::OnFramePayload(const char* data, size_t len) {
+  if (streams_reset_.contains(metadata_stream_id_)) {
+    return;
+  }
   if (metadata_length_ > 0) {
     QUICHE_DCHECK_LE(len, metadata_length_);
     const bool payload_success = visitor_.OnMetadataForStream(
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 17919f7..3cdcd12 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -437,6 +437,8 @@
 
   WindowManager connection_window_manager_;
 
+  // Tracks the streams that have been marked for reset. A stream is removed
+  // from this set once it is closed.
   absl::flat_hash_set<Http2StreamId> streams_reset_;
 
   // The number of frames currently queued per stream.