Allow OgHttp2Session to blackhole data on connection errors.

This CL adds an option to OgHttp2Session::Options that will cause
OgHttp2Session to behave more similarly to nghttp2 when encountering
connection-level errors while processing data. Specifically:
  1. When encountering connection errors, send a GOAWAY but mark the entire
     input as consumed [1], i.e., be a sink.
  2. When a connection-error GOAWAY is sent, be a sink for subsequent
     ProcessBytes() calls [2].
  3. Return errors only for bad client magic or user callback failures [3].

Each of the pre-existing affected oghttp2 tests in this CL with nghttp2
analogues now have aligned ProcessBytes() return values.

Having this option enabled in Envoy is a prerequisite for codec_impl_test
LargeServerBodyFlushTimeoutAfterGoaway to pass.

[1] e.g., http://google3/third_party/nghttp2/src/lib/nghttp2_session.c;l=5428-5435;rcl=314948637.
    There are more examples of this pattern throughout nghttp2_session_mem_recv().
[2] http://google3/third_party/nghttp2/src/lib/nghttp2_session.c;l=5382-5384;rcl=314948637
[3] https://nghttp2.org/documentation/nghttp2_session_mem_recv.html. There are technically two other possible negative return values, but I don't think Envoy considers NGHTTP2_ERR_NOMEM, and Envoy is also skeptical about NGHTTP2_ERR_FLOODED: http://google3/third_party/envoy/src/source/common/http/http2/codec_impl.cc;l=844-851;rcl=415656746, http://google3/third_party/envoy/src/source/common/http/http2/codec_impl.cc;l=1726-1732;rcl=415656746.

PiperOrigin-RevId: 417432491
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index b2494e5..b5bb2a3 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -3910,6 +3910,35 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+// Verifies that a connection-level processing error results in repeatedly
+// returning a positive value for ProcessBytes() to mark all data as consumed.
+TEST(NgHttp2AdapterTest, ConnectionErrorWithBlackholeSinkingData) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames =
+      TestFrameSequence().ClientPreface().WindowUpdate(1, 42).Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnInvalidFrame(1, _));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
+
+  // Ask the connection to process more bytes. Because the option is enabled,
+  // the data should be marked as consumed.
+  const std::string next_frame = TestFrameSequence().Ping(42).Serialize();
+  const int64_t next_result = adapter->ProcessBytes(next_frame);
+  EXPECT_EQ(static_cast<size_t>(next_result), next_frame.size());
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index adc46c0..ae277b6 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1670,7 +1670,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kInvalidPushPromise));
 
   const int64_t read_result = adapter->ProcessBytes(frames);
-  EXPECT_LT(read_result, 0);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
 
   EXPECT_TRUE(adapter->want_write());
 
@@ -1742,7 +1742,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kInvalidNewStreamId));
 
   const int64_t read_result = adapter->ProcessBytes(frames);
-  EXPECT_LT(read_result, 0);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
 
   EXPECT_TRUE(adapter->want_write());
 
@@ -2503,7 +2503,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kParseError));
 
   int64_t result = adapter->ProcessBytes(frames);
-  EXPECT_LT(result, 0);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
 
   EXPECT_TRUE(adapter->want_write());
 
@@ -3422,7 +3422,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kInvalidNewStreamId));
 
   const int64_t result = adapter->ProcessBytes(frames);
-  EXPECT_LT(result, 0);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
 
   EXPECT_EQ(3, adapter->GetHighestReceivedStreamId());
 
@@ -3467,7 +3467,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
 
   const int64_t result = adapter->ProcessBytes(frames);
-  EXPECT_LT(result, 0);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
 
   EXPECT_EQ(1, adapter->GetHighestReceivedStreamId());
 
@@ -3514,7 +3514,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
 
   const int64_t result = adapter->ProcessBytes(frames);
-  EXPECT_LT(result, 0);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
 
   EXPECT_EQ(1, adapter->GetHighestReceivedStreamId());
 
@@ -3562,7 +3562,7 @@
   EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
 
   const int64_t result = adapter->ProcessBytes(frames);
-  EXPECT_LT(result, 0);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
 
   EXPECT_EQ(1, adapter->GetHighestReceivedStreamId());
 
@@ -3649,14 +3649,13 @@
   EXPECT_CALL(
       visitor,
       OnInvalidFrame(3, Http2VisitorInterface::InvalidFrameError::kProtocol));
-  // The oghttp2 stack also signals the connection error via OnConnectionError()
-  // and a negative ProcessBytes() return value.
+  // The oghttp2 stack also signals the error via OnConnectionError().
   EXPECT_CALL(visitor,
               OnConnectionError(Http2VisitorInterface::ConnectionError::
                                     kExceededMaxConcurrentStreams));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
-  EXPECT_LT(stream_result, 0);
+  EXPECT_EQ(static_cast<size_t>(stream_result), stream_frames.size());
 
   // The server should send a GOAWAY for this error, even though
   // OnInvalidFrame() returns true.
@@ -3958,6 +3957,69 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+// Verifies that a connection-level processing error results in repeatedly
+// returning a positive value for ProcessBytes() to mark all data as consumed
+// when the blackhole option is enabled.
+TEST(OgHttp2AdapterServerTest, ConnectionErrorWithBlackholingData) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer,
+                                  .blackhole_data_on_connection_error = true};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string frames =
+      TestFrameSequence().ClientPreface().WindowUpdate(1, 42).Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(result), frames.size());
+
+  // Ask the connection to process more bytes. Because the option is enabled,
+  // the data should be marked as consumed.
+  const std::string next_frame = TestFrameSequence().Ping(42).Serialize();
+  const int64_t next_result = adapter->ProcessBytes(next_frame);
+  EXPECT_EQ(static_cast<size_t>(next_result), next_frame.size());
+}
+
+// Verifies that a connection-level processing error results in returning a
+// negative value for ProcessBytes() when the blackhole option is disabled.
+TEST(OgHttp2AdapterServerTest, ConnectionErrorWithoutBlackholingData) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer,
+                                  .blackhole_data_on_connection_error = false};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string frames =
+      TestFrameSequence().ClientPreface().WindowUpdate(1, 42).Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_LT(result, 0);
+
+  // Ask the connection to process more bytes. Because the option is disabled,
+  // ProcessBytes() should continue to return an error.
+  const std::string next_frame = TestFrameSequence().Ping(42).Serialize();
+  const int64_t next_result = adapter->ProcessBytes(next_frame);
+  EXPECT_LT(next_result, 0);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 781c49f..028eb73 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -408,6 +408,10 @@
   processing_bytes_ = true;
   RunOnExit r{[this]() { processing_bytes_ = false; }};
 
+  if (options_.blackhole_data_on_connection_error && latched_error_) {
+    return bytes.size();
+  }
+
   int64_t preface_consumed = 0;
   if (!remaining_preface_.empty()) {
     QUICHE_VLOG(2) << "Preface bytes remaining: " << remaining_preface_.size();
@@ -440,7 +444,11 @@
   }
   if (latched_error_ || result < 0) {
     QUICHE_VLOG(2) << "ProcessBytes encountered an error.";
-    return ProcessBytesError::kUnspecified;
+    if (options_.blackhole_data_on_connection_error) {
+      return bytes.size() + preface_consumed;
+    } else {
+      return ProcessBytesError::kUnspecified;
+    }
   }
   return result + preface_consumed;
 }
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index f7c7779..7bd2385 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -52,6 +52,10 @@
     // has indicated the end of data. If false, the server will assume that
     // submitting trailers indicates the end of data.
     bool trailers_require_end_data = false;
+    // Whether to mark all input data as consumed upon encountering a connection
+    // error while processing bytes. If true, subsequent processing will also
+    // mark all input data as consumed.
+    bool blackhole_data_on_connection_error = true;
   };
 
   OgHttp2Session(Http2VisitorInterface& visitor, Options options);