Allow visitor_.OnFrameSent() to propagate a fatal send error, and do not retry sending on fatal send errors.

This CL makes OgHttp2Session::AfterFrameSent() return a bool to reflect an
error value from visitor_.OnFrameSent(), which is treated as a fatal connection
error and propagated to OgHttp2Session::Send(). This propagation is helpful for
compatibility with nghttp2 and Envoy.

For reference, the only (practically considered) error code nghttp2 returns for
failed sends is a fatal error:
https://nghttp2.org/documentation/nghttp2_session_send.html

This CL also makes a change to not let OgHttp2Session::Send() send data when it
has previously returned an error value. This is not consistent with nghttp2,
but it feels right and does not negatively affect current codec_impl_tests.

With this CL, codec_impl_tests Sponge (ProtocolErrorForTest now passes):
http://sponge2/279619fd-ba65-4ba9-ad96-367b4606cfc8

PiperOrigin-RevId: 416424214
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 6410b97..fc5dbd2 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -1992,6 +1992,118 @@
   EXPECT_EQ(result, NGHTTP2_ERR_CALLBACK_FAILURE);
 }
 
+TEST(NgHttp2AdapterTest, ConnectionErrorOnControlFrameSent) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames =
+      TestFrameSequence().ClientPreface().Ping(42).Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // PING
+  EXPECT_CALL(visitor, OnFrameHeader(0, _, PING, 0));
+  EXPECT_CALL(visitor, OnPing(42, false));
+
+  const int64_t read_result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // SETTINGS ack
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0))
+      .WillOnce(testing::Return(-902));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kSendError));
+
+  int send_result = adapter->Send();
+  EXPECT_LT(send_result, 0);
+
+  // Apparently nghttp2 retries sending the frames that had failed before.
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(PING, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(PING, 0, _, 0x1, 0));
+  send_result = adapter->Send();
+  EXPECT_EQ(send_result, 0);
+
+  EXPECT_FALSE(adapter->want_write());
+}
+
+TEST(NgHttp2AdapterTest, ConnectionErrorOnDataFrameSent) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+
+  const int64_t read_result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  auto body = absl::make_unique<TestDataFrameSource>(visitor, true);
+  body->AppendPayload("Here is some data, which will lead to a fatal error");
+  TestDataFrameSource* body_ptr = body.get();
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // SETTINGS ack
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  // Stream 1, with doomed DATA
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0))
+      .WillOnce(testing::Return(-902));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kSendError));
+
+  int send_result = adapter->Send();
+  EXPECT_LT(send_result, 0);
+
+  // The test data source got a signal that the first chunk of data was sent
+  // successfully, so discarded that data internally. However, due to the send
+  // error, the next Send() from nghttp2 will try to send that exact same data
+  // again. Without this line appending the exact same data back to the data
+  // source, the test crashes. It is not clear how the data source would know to
+  // not discard the data, unless told by the session? This is not intuitive.
+  body_ptr->AppendPayload(
+      "Here is some data, which will lead to a fatal error");
+
+  // Apparently nghttp2 retries sending the frames that had failed before.
+  EXPECT_TRUE(adapter->want_write());
+  EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(send_result, 0);
+
+  EXPECT_FALSE(adapter->want_write());
+}
+
 TEST(NgHttp2AdapterTest, ServerConstruction) {
   testing::StrictMock<MockHttp2Visitor> visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 50dce2c..8d62da9 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -2164,6 +2164,101 @@
                             SpdyFrameType::PING}));
 }
 
+TEST_F(OgHttp2AdapterTest, ConnectionErrorOnControlFrameSent) {
+  const std::string frames =
+      TestFrameSequence().ClientPreface().Ping(42).Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(http2_visitor_, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(http2_visitor_, OnSettingsStart());
+  EXPECT_CALL(http2_visitor_, OnSettingsEnd());
+  // PING
+  EXPECT_CALL(http2_visitor_, OnFrameHeader(0, _, PING, 0));
+  EXPECT_CALL(http2_visitor_, OnPing(42, false));
+
+  const int64_t read_result = adapter_->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  EXPECT_TRUE(adapter_->want_write());
+
+  // Server preface (SETTINGS)
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  // SETTINGS ack
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, 0, 0x1, 0))
+      .WillOnce(testing::Return(-902));
+  EXPECT_CALL(http2_visitor_, OnConnectionError(ConnectionError::kSendError));
+
+  int send_result = adapter_->Send();
+  EXPECT_LT(send_result, 0);
+
+  EXPECT_FALSE(adapter_->want_write());
+
+  send_result = adapter_->Send();
+  EXPECT_LT(send_result, 0);
+}
+
+TEST_F(OgHttp2AdapterTest, ConnectionErrorOnDataFrameSent) {
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(http2_visitor_, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(http2_visitor_, OnSettingsStart());
+  EXPECT_CALL(http2_visitor_, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(http2_visitor_, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(http2_visitor_, OnBeginHeadersForStream(1));
+  EXPECT_CALL(http2_visitor_, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(http2_visitor_, OnEndHeadersForStream(1));
+  EXPECT_CALL(http2_visitor_, OnEndStream(1));
+
+  const int64_t read_result = adapter_->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  auto body = absl::make_unique<TestDataFrameSource>(http2_visitor_, true);
+  body->AppendPayload("Here is some data, which will lead to a fatal error");
+  TestDataFrameSource* body_ptr = body.get();
+  int submit_result = adapter_->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  EXPECT_TRUE(adapter_->want_write());
+
+  // Server preface (SETTINGS)
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  // SETTINGS ack
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  // Stream 1, with doomed DATA
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(DATA, 1, _, 0x0, 0))
+      .WillOnce(testing::Return(-902));
+  EXPECT_CALL(http2_visitor_, OnConnectionError(ConnectionError::kSendError));
+
+  int send_result = adapter_->Send();
+  EXPECT_LT(send_result, 0);
+
+  body_ptr->AppendPayload("After the fatal error, data will be sent no more");
+
+  EXPECT_FALSE(adapter_->want_write());
+
+  send_result = adapter_->Send();
+  EXPECT_LT(send_result, 0);
+}
+
 TEST(OgHttp2AdapterServerTest, ClientSendsContinuation) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 346035a..0519cee 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -477,6 +477,10 @@
   sending_ = true;
   RunOnExit r{[this]() { sending_ = false; }};
 
+  if (fatal_send_error_) {
+    return kSendError;
+  }
+
   MaybeSetupPreface();
 
   SendResult continue_writing = SendQueuedFrames();
@@ -495,7 +499,12 @@
   if (continue_writing == SendResult::SEND_OK) {
     continue_writing = SendQueuedFrames();
   }
-  return continue_writing == SendResult::SEND_ERROR ? kSendError : 0;
+  if (continue_writing == SendResult::SEND_ERROR) {
+    fatal_send_error_ = true;
+    return kSendError;
+  } else {
+    return 0;
+  }
 }
 
 bool OgHttp2Session::HasReadyStream() const {
@@ -564,10 +573,16 @@
       // Write blocked.
       return SendResult::SEND_BLOCKED;
     } else {
-      AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
-                     c.flags(), c.error_code());
-
       frames_.pop_front();
+
+      const bool ok =
+          AfterFrameSent(c.frame_type(), c.stream_id(), frame_payload_length,
+                         c.flags(), c.error_code());
+      if (!ok) {
+        LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
+                            ConnectionError::kSendError);
+        return SendResult::SEND_ERROR;
+      }
       if (static_cast<size_t>(result) < frame.size()) {
         // The frame was partially written, so the rest must be buffered.
         buffered_data_.append(frame.data() + result, frame.size() - result);
@@ -578,11 +593,14 @@
   return SendResult::SEND_OK;
 }
 
-void OgHttp2Session::AfterFrameSent(uint8_t frame_type, uint32_t stream_id,
+bool OgHttp2Session::AfterFrameSent(uint8_t frame_type, uint32_t stream_id,
                                     size_t payload_length, uint8_t flags,
                                     uint32_t error_code) {
-  visitor_.OnFrameSent(frame_type, stream_id, payload_length, flags,
-                       error_code);
+  int result = visitor_.OnFrameSent(frame_type, stream_id, payload_length,
+                                    flags, error_code);
+  if (result < 0) {
+    return false;
+  }
   if (stream_id == 0) {
     const bool is_settings_ack =
         static_cast<FrameType>(frame_type) == FrameType::SETTINGS &&
@@ -592,7 +610,7 @@
           encoder_header_table_capacity_when_acking_.value());
       encoder_header_table_capacity_when_acking_ = absl::nullopt;
     }
-    return;
+    return true;
   }
   auto iter = queued_frames_.find(stream_id);
   if (frame_type != 0) {
@@ -602,6 +620,7 @@
     // TODO(birenroy): Consider passing through `error_code` here.
     CloseStreamIfReady(frame_type, stream_id);
   }
+  return true;
 }
 
 OgHttp2Session::SendResult OgHttp2Session::WriteForStream(
@@ -683,7 +702,13 @@
         state.half_closed_local = true;
         MaybeFinWithRstStream(it);
       }
-      AfterFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
+      const bool ok =
+          AfterFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
+      if (!ok) {
+        LatchErrorAndNotify(Http2ErrorCode::INTERNAL_ERROR,
+                            ConnectionError::kSendError);
+        return SendResult::SEND_ERROR;
+      }
       if (!stream_map_.contains(stream_id)) {
         // Note: the stream may have been closed if `fin` is true.
         break;
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 74dedac..aea6749 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -124,8 +124,9 @@
     return !received_goaway_ && !decoder_.HasError();
   }
   bool want_write() const override {
-    return !frames_.empty() || !buffered_data_.empty() ||
-           !connection_metadata_.empty() || HasReadyStream();
+    return !fatal_send_error_ &&
+           (!frames_.empty() || !buffered_data_.empty() ||
+            !connection_metadata_.empty() || HasReadyStream());
   }
   int GetRemoteWindowSize() const override { return connection_send_window_; }
 
@@ -299,7 +300,8 @@
   // Serializes and sends queued frames.
   SendResult SendQueuedFrames();
 
-  void AfterFrameSent(uint8_t frame_type, uint32_t stream_id,
+  // Returns false if a fatal connection error occurred.
+  bool AfterFrameSent(uint8_t frame_type, uint32_t stream_id,
                       size_t payload_length, uint8_t flags,
                       uint32_t error_code);
 
@@ -452,6 +454,9 @@
   // Replace this with a stream ID, for multiple GOAWAY support.
   bool queued_goaway_ = false;
   bool latched_error_ = false;
+
+  // True if a fatal sending error has occurred.
+  bool fatal_send_error_ = false;
 };
 
 }  // namespace adapter