Avoid sending frames after sending an immediate GOAWAY in OgHttp2Session.

This CL makes the following related changes to OgHttp2Session:
  - Add a queued_immediate_goaway_ field, set if the enqueued GOAWAY is the
    result of a connection-level error (i.e., if latched_error_ is true).
  - Make EnqueueFrame() a no-op if queued_immediate_goaway_ is true.
  - Avoid writing for streams if queued_immediate_goaway_ is true.

These changes allow OgHttp2Session to avoid sending either control frames or
DATA frames once a GOAWAY is sent.

This CL also fixes a potential memory leak in NgHttp2Adapter with submitted
METADATA that is now also not sent after an immediate GOAWAY. In particular,
MetadataSource now has a new interface method OnFailure() that
SelfDeletingMetadataSource() can override to self-delete.
MetadataSource::OnFailure() is now called in
nghttp2_on_frame_not_send_callback() (newly set to call new OnFrameNotSent()).

This CL marks the triumphant passing of codec_impl_test
LargeServerBodyFlushTimeoutAfterGoaway
(http://sponge2/cc0358b2-2881-4502-aa95-6cf2cd9b6de5).

PiperOrigin-RevId: 417475633
diff --git a/http2/adapter/data_source.h b/http2/adapter/data_source.h
index 63448b3..dd4601f 100644
--- a/http2/adapter/data_source.h
+++ b/http2/adapter/data_source.h
@@ -48,6 +48,10 @@
   // to indicate an error, as well as a boolean indicating whether the metadata
   // has been completely copied.
   virtual std::pair<int64_t, bool> Pack(uint8_t* dest, size_t dest_len) = 0;
+
+  // This method is called when transmission of the metadata for this source
+  // fails in a non-recoverable way.
+  virtual void OnFailure() = 0;
 };
 
 }  // namespace adapter
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 821eb8c..a6ba0b2 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -35,6 +35,11 @@
     return result;
   }
 
+  void OnFailure() override {
+    source_->OnFailure();
+    delete this;
+  }
+
  private:
   std::unique_ptr<MetadataSource> source_;
 };
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index b5bb2a3..4e0400a 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -3939,6 +3939,99 @@
   EXPECT_EQ(static_cast<size_t>(next_result), next_frame.size());
 }
 
+TEST(NgHttp2AdapterTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+
+  const int64_t read_result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  // Submit a response for the stream.
+  auto body = absl::make_unique<TestDataFrameSource>(visitor, true);
+  body->AppendPayload("This data is doomed to never be written.");
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  // Submit a SETTINGS frame.
+  adapter->SubmitSettings({});
+
+  // Submit a WINDOW_UPDATE frame.
+  adapter->SubmitWindowUpdate(kConnectionStreamId, 42);
+
+  // Submit another SETTINGS frame.
+  adapter->SubmitSettings({});
+
+  // Submit some metadata.
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Trigger a connection error. Only the response headers will be written.
+  const std::string connection_error_frames =
+      TestFrameSequence().WindowUpdate(3, 42).Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(3, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnInvalidFrame(3, _));
+
+  const int64_t result = adapter->ProcessBytes(connection_error_frames);
+  EXPECT_EQ(static_cast<size_t>(result), connection_error_frames.size());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(GOAWAY, 0, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::PROTOCOL_ERROR)));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // The GOAWAY apparently causes the other frames to be dropped except for the
+  // non-ack SETTINGS frames; nghttp2 sends non-ack SETTINGS frames because they
+  // could be the initial SETTINGS frame. However, nghttp2 still allows sending
+  // multiple non-ack SETTINGS, which feels non-ideal.
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::GOAWAY}));
+  visitor.Clear();
+
+  // Try to submit more frames for writing. They should not be written.
+  adapter->SubmitPing(42);
+  EXPECT_FALSE(adapter->want_write());
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), testing::IsEmpty());
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/nghttp2_callbacks.cc b/http2/adapter/nghttp2_callbacks.cc
index dc6df1d..2d05bee 100644
--- a/http2/adapter/nghttp2_callbacks.cc
+++ b/http2/adapter/nghttp2_callbacks.cc
@@ -207,6 +207,21 @@
                               frame->hd.length, frame->hd.flags, error_code);
 }
 
+int OnFrameNotSent(nghttp2_session* /* session */, const nghttp2_frame* frame,
+                   int /* lib_error_code */, void* /* user_data */) {
+  if (frame->hd.type == kMetadataFrameType) {
+    auto* source = static_cast<MetadataSource*>(frame->ext.payload);
+    if (source == nullptr) {
+      QUICHE_BUG(not_sent_payload_is_nullptr)
+          << "Extension frame payload for stream " << frame->hd.stream_id
+          << " is null!";
+    } else {
+      source->OnFailure();
+    }
+  }
+  return 0;
+}
+
 int OnInvalidFrameReceived(nghttp2_session* /* session */,
                            const nghttp2_frame* frame, int lib_error_code,
                            void* user_data) {
@@ -313,10 +328,11 @@
   nghttp2_session_callbacks_set_before_frame_send_callback(callbacks,
                                                            &OnBeforeFrameSent);
   nghttp2_session_callbacks_set_on_frame_send_callback(callbacks, &OnFrameSent);
+  nghttp2_session_callbacks_set_on_frame_not_send_callback(callbacks,
+                                                           &OnFrameNotSent);
   nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(
       callbacks, &OnInvalidFrameReceived);
   nghttp2_session_callbacks_set_error_callback2(callbacks, &OnError);
-  // on_frame_not_send_callback <- just ignored
   nghttp2_session_callbacks_set_send_data_callback(
       callbacks, &DataFrameSourceSendCallback);
   nghttp2_session_callbacks_set_pack_extension_callback(
diff --git a/http2/adapter/nghttp2_callbacks.h b/http2/adapter/nghttp2_callbacks.h
index 293c6d6..82b7cf1 100644
--- a/http2/adapter/nghttp2_callbacks.h
+++ b/http2/adapter/nghttp2_callbacks.h
@@ -48,6 +48,10 @@
 int OnFrameSent(nghttp2_session* session, const nghttp2_frame* frame,
                 void* user_data);
 
+// Invoked when a non-DATA frame is not sent because of an error.
+int OnFrameNotSent(nghttp2_session* session, const nghttp2_frame* frame,
+                   int lib_error_code, void* user_data);
+
 // Invoked when an invalid frame is received.
 int OnInvalidFrameReceived(nghttp2_session* session, const nghttp2_frame* frame,
                            int lib_error_code, void* user_data);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index ae277b6..15d1e3a 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -4020,6 +4020,110 @@
   EXPECT_LT(next_result, 0);
 }
 
+TEST(OgHttp2AdapterServerTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "GET"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/true)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+
+  const int64_t read_result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(static_cast<size_t>(read_result), frames.size());
+
+  // Submit a response for the stream.
+  auto body = absl::make_unique<TestDataFrameSource>(visitor, true);
+  body->AppendPayload("This data is doomed to never be written.");
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}}), std::move(body));
+  ASSERT_EQ(0, submit_result);
+
+  // Submit a SETTINGS frame.
+  adapter->SubmitSettings({});
+
+  // Submit a WINDOW_UPDATE frame.
+  adapter->SubmitWindowUpdate(kConnectionStreamId, 42);
+
+  // Submit another SETTINGS frame.
+  adapter->SubmitSettings({});
+
+  // Submit some metadata.
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Trigger a connection error. Only the response headers will be written.
+  const std::string connection_error_frames =
+      TestFrameSequence().WindowUpdate(3, 42).Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(3, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnConnectionError(ConnectionError::kWrongFrameSequence));
+
+  const int64_t result = adapter->ProcessBytes(connection_error_frames);
+  EXPECT_EQ(static_cast<size_t>(result), connection_error_frames.size());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(WINDOW_UPDATE, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(WINDOW_UPDATE, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(GOAWAY, 0, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::PROTOCOL_ERROR)));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // TODO(diannahu): Drop other frames like nghttp2.
+  EXPECT_THAT(
+      visitor.data(),
+      EqualsFrames(
+          {spdy::SpdyFrameType::SETTINGS, spdy::SpdyFrameType::SETTINGS,
+           spdy::SpdyFrameType::HEADERS, spdy::SpdyFrameType::SETTINGS,
+           spdy::SpdyFrameType::WINDOW_UPDATE, spdy::SpdyFrameType::SETTINGS,
+           spdy::SpdyFrameType::GOAWAY}));
+  visitor.Clear();
+
+  // Try to submit more frames for writing. They should not be written.
+  adapter->SubmitPing(42);
+  // TODO(diannahu): Enable the below expectation.
+  // EXPECT_FALSE(adapter->want_write());
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), testing::IsEmpty());
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 028eb73..af843e8 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -479,9 +479,18 @@
 }
 
 void OgHttp2Session::EnqueueFrame(std::unique_ptr<spdy::SpdyFrameIR> frame) {
+  if (queued_immediate_goaway_) {
+    // Do not allow additional frames to be enqueued after the GOAWAY.
+    return;
+  }
+
   RunOnExit r;
   if (frame->frame_type() == spdy::SpdyFrameType::GOAWAY) {
     queued_goaway_ = true;
+    if (latched_error_) {
+      // TODO(diannahu): Clear the frames queue.
+      queued_immediate_goaway_ = true;
+    }
   } else if (frame->fin() ||
              frame->frame_type() == spdy::SpdyFrameType::RST_STREAM) {
     auto iter = stream_map_.find(frame->stream_id());
@@ -520,6 +529,12 @@
   MaybeSetupPreface();
 
   SendResult continue_writing = SendQueuedFrames();
+  if (queued_immediate_goaway_) {
+    // If an immediate GOAWAY was queued, then the above flush either sent the
+    // GOAWAY or buffered it to be sent on the next successful flush. In either
+    // case, return early here to avoid sending other frames.
+    return InterpretSendResult(continue_writing);
+  }
   while (continue_writing == SendResult::SEND_OK &&
          !connection_metadata_.empty()) {
     continue_writing = SendMetadata(0, connection_metadata_);
@@ -535,7 +550,11 @@
   if (continue_writing == SendResult::SEND_OK) {
     continue_writing = SendQueuedFrames();
   }
-  if (continue_writing == SendResult::SEND_ERROR) {
+  return InterpretSendResult(continue_writing);
+}
+
+int OgHttp2Session::InterpretSendResult(SendResult result) {
+  if (result == SendResult::SEND_ERROR) {
     fatal_send_error_ = true;
     return kSendError;
   } else {
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 7bd2385..d9f00df 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -294,6 +294,9 @@
     SEND_ERROR,
   };
 
+  // Returns the int corresponding to the `result`, updating state as needed.
+  int InterpretSendResult(SendResult result);
+
   enum class ProcessBytesError {
     // A general, unspecified error.
     kUnspecified,
@@ -474,6 +477,7 @@
 
   // Replace this with a stream ID, for multiple GOAWAY support.
   bool queued_goaway_ = false;
+  bool queued_immediate_goaway_ = false;
   bool latched_error_ = false;
 
   // True if a fatal sending error has occurred.
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index 886d0bf..be53ba8 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -110,6 +110,7 @@
     return (encoded_entries_.size() + max_frame_size - 1) / max_frame_size;
   }
   std::pair<int64_t, bool> Pack(uint8_t* dest, size_t dest_len) override;
+  void OnFailure() override {}
 
  private:
   const std::string encoded_entries_;