Adds metadata sending support to NgHttp2Adapter.

This is a follow-on to cl/388225769, which added the same functionality for the oghttp2 stack.

PiperOrigin-RevId: 391085003
diff --git a/http2/adapter/data_source.h b/http2/adapter/data_source.h
index 465aa2f..63448b3 100644
--- a/http2/adapter/data_source.h
+++ b/http2/adapter/data_source.h
@@ -38,6 +38,11 @@
  public:
   virtual ~MetadataSource() {}
 
+  // Returns the number of frames of at most |max_frame_size| required to
+  // serialize the metadata for this source. Only required by the nghttp2
+  // implementation.
+  virtual size_t NumFrames(size_t max_frame_size) const = 0;
+
   // This method is called with a destination buffer and length. It should
   // return the number of payload bytes copied to |dest|, or a negative integer
   // to indicate an error, as well as a boolean indicating whether the metadata
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index b044cb6..5d20f20 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -66,7 +66,7 @@
 
   // Submits a sequence of METADATA frames for the given stream. A |stream_id|
   // of 0 indicates connection-level METADATA.
-  virtual void SubmitMetadata(Http2StreamId stream_id,
+  virtual void SubmitMetadata(Http2StreamId stream_id, size_t max_frame_size,
                               std::unique_ptr<MetadataSource> source) = 0;
 
   // Invokes the visitor's OnReadyToSend() method for serialized frame data.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 5696df7..5da518f 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -12,6 +12,32 @@
 namespace http2 {
 namespace adapter {
 
+namespace {
+
+// A metadata source that deletes itself upon completion.
+class SelfDeletingMetadataSource : public MetadataSource {
+ public:
+  explicit SelfDeletingMetadataSource(std::unique_ptr<MetadataSource> source)
+      : source_(std::move(source)) {}
+
+  size_t NumFrames(size_t max_frame_size) const override {
+    return source_->NumFrames(max_frame_size);
+  }
+
+  std::pair<int64_t, bool> Pack(uint8_t* dest, size_t dest_len) override {
+    const auto result = source_->Pack(dest, dest_len);
+    if (result.first < 0 || result.second) {
+      delete this;
+    }
+    return result;
+  }
+
+ private:
+  std::unique_ptr<MetadataSource> source_;
+};
+
+}  // anonymous namespace
+
 /* static */
 std::unique_ptr<NgHttp2Adapter> NgHttp2Adapter::CreateClientAdapter(
     Http2VisitorInterface& visitor, const nghttp2_option* options) {
@@ -90,9 +116,26 @@
                                stream_id, window_increment);
 }
 
-void NgHttp2Adapter::SubmitMetadata(
-    Http2StreamId /*stream_id*/, std::unique_ptr<MetadataSource> /*source*/) {
-  QUICHE_LOG(DFATAL) << "Not implemented";
+void NgHttp2Adapter::SubmitMetadata(Http2StreamId stream_id,
+                                    size_t max_frame_size,
+                                    std::unique_ptr<MetadataSource> source) {
+  auto* wrapped_source = new SelfDeletingMetadataSource(std::move(source));
+  const size_t num_frames = wrapped_source->NumFrames(max_frame_size);
+  size_t num_successes = 0;
+  for (size_t i = 1; i <= num_frames; ++i) {
+    const int result = nghttp2_submit_extension(
+        session_->raw_ptr(), kMetadataFrameType,
+        i == num_frames ? kMetadataEndFlag : 0, stream_id, wrapped_source);
+    if (result != 0) {
+      QUICHE_LOG(DFATAL) << "Failed to submit extension frame " << i << " of "
+                         << num_frames;
+      break;
+    }
+    ++num_successes;
+  }
+  if (num_successes == 0) {
+    delete wrapped_source;
+  }
 }
 
 int NgHttp2Adapter::Send() {
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index 2da18cf..4617806 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -51,7 +51,7 @@
 
   void SubmitRst(Http2StreamId stream_id, Http2ErrorCode error_code) override;
 
-  void SubmitMetadata(Http2StreamId stream_id,
+  void SubmitMetadata(Http2StreamId stream_id, size_t max_frame_size,
                       std::unique_ptr<MetadataSource> source) override;
 
   int Send() override;
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 5861e66..4435810 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -2,6 +2,7 @@
 
 #include "http2/adapter/mock_http2_visitor.h"
 #include "http2/adapter/nghttp2_test_utils.h"
+#include "http2/adapter/oghttp2_util.h"
 #include "http2/adapter/test_frame_sequence.h"
 #include "http2/adapter/test_utils.h"
 #include "common/platform/api/quiche_test.h"
@@ -1114,6 +1115,89 @@
   EXPECT_FALSE(adapter->session().want_write());
 }
 
+TEST(NgHttp2AdapterTest, SubmitMetadata) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+  EXPECT_TRUE(adapter->session().want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(
+      serialized,
+      EqualsFrames({static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter->session().want_write());
+}
+
+TEST(NgHttp2AdapterTest, SubmitMetadataMultipleFrames) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  const auto kLargeValue = std::string(63 * 1024, 'a');
+  auto source = absl::make_unique<TestMetadataSource>(
+      ToHeaderBlock(ToHeaders({{"large-value", kLargeValue}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+  EXPECT_TRUE(adapter->session().want_write());
+
+  testing::InSequence seq;
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(
+      serialized,
+      EqualsFrames({static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter->session().want_write());
+}
+
+TEST(NgHttp2AdapterTest, SubmitConnectionMetadata) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(0, 16384u, std::move(source));
+  EXPECT_TRUE(adapter->session().want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 0, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 0, _, 0x4, 0));
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(
+      serialized,
+      EqualsFrames({static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter->session().want_write());
+}
+
 TEST(NgHttp2AdapterTest, ServerConstruction) {
   testing::StrictMock<MockHttp2Visitor> visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/nghttp2_callbacks.cc b/http2/adapter/nghttp2_callbacks.cc
index 6cfdbcb..8956b7a 100644
--- a/http2/adapter/nghttp2_callbacks.cc
+++ b/http2/adapter/nghttp2_callbacks.cc
@@ -4,6 +4,7 @@
 #include <cstring>
 
 #include "absl/strings/string_view.h"
+#include "http2/adapter/data_source.h"
 #include "http2/adapter/http2_protocol.h"
 #include "http2/adapter/http2_visitor_interface.h"
 #include "http2/adapter/nghttp2_data_provider.h"
@@ -253,12 +254,20 @@
   return 0;
 }
 
-ssize_t OnPackExtensionCallback(nghttp2_session* /*session*/, uint8_t* /*buf*/,
-                                size_t /*len*/, const nghttp2_frame* /*frame*/,
+ssize_t OnPackExtensionCallback(nghttp2_session* /*session*/, uint8_t* buf,
+                                size_t len, const nghttp2_frame* frame,
                                 void* user_data) {
   QUICHE_CHECK_NE(user_data, nullptr);
-  QUICHE_LOG(DFATAL) << "Not implemented";
-  return NGHTTP2_ERR_CALLBACK_FAILURE;
+  auto* source = static_cast<MetadataSource*>(frame->ext.payload);
+  const std::pair<int64_t, bool> result = source->Pack(buf, len);
+  if (result.first < 0) {
+    return NGHTTP2_ERR_CALLBACK_FAILURE;
+  }
+  const bool end_metadata_flag = (frame->hd.flags & kMetadataEndFlag);
+  QUICHE_LOG_IF(DFATAL, result.second != end_metadata_flag)
+      << "Metadata ends: " << result.second
+      << " has kMetadataEndFlag: " << end_metadata_flag;
+  return result.first;
 }
 
 int OnError(nghttp2_session* /*session*/, int /*lib_error_code*/,
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index e027851..252e73e 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -76,7 +76,10 @@
 }
 
 void OgHttp2Adapter::SubmitMetadata(Http2StreamId stream_id,
+                                    size_t /* max_frame_size */,
                                     std::unique_ptr<MetadataSource> source) {
+  // Not necessary to pass max_frame_size along, since OgHttp2Session tracks the
+  // peer's advertised max frame size.
   session_->SubmitMetadata(stream_id, std::move(source));
 }
 
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index bcd373a..cc02a6e 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -35,7 +35,7 @@
                     absl::string_view opaque_data) override;
   void SubmitWindowUpdate(Http2StreamId stream_id,
                           int window_increment) override;
-  void SubmitMetadata(Http2StreamId stream_id,
+  void SubmitMetadata(Http2StreamId stream_id, size_t max_frame_size,
                       std::unique_ptr<MetadataSource> source) override;
   int Send() override;
   int GetSendWindowSize() const override;
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 96c52be..053b2d5 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -616,7 +616,7 @@
 TEST_F(OgHttp2AdapterTest, SubmitMetadata) {
   auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
       {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
-  adapter_->SubmitMetadata(1, std::move(source));
+  adapter_->SubmitMetadata(1, 16384u, std::move(source));
   EXPECT_TRUE(adapter_->session().want_write());
 
   EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -633,10 +633,42 @@
   EXPECT_FALSE(adapter_->session().want_write());
 }
 
+TEST_F(OgHttp2AdapterTest, SubmitMetadataMultipleFrames) {
+  const auto kLargeValue = std::string(63 * 1024, 'a');
+  auto source = absl::make_unique<TestMetadataSource>(
+      ToHeaderBlock(ToHeaders({{"large-value", kLargeValue}})));
+  adapter_->SubmitMetadata(1, 16384u, std::move(source));
+  EXPECT_TRUE(adapter_->session().want_write());
+
+  testing::InSequence seq;
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 1, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  int result = adapter_->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view serialized = http2_visitor_.data();
+  EXPECT_THAT(
+      serialized,
+      EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType),
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter_->session().want_write());
+}
+
 TEST_F(OgHttp2AdapterTest, SubmitConnectionMetadata) {
   auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
       {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
-  adapter_->SubmitMetadata(0, std::move(source));
+  adapter_->SubmitMetadata(0, 16384u, std::move(source));
   EXPECT_TRUE(adapter_->session().want_write());
 
   EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 7c4593f..bed7f4e 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -10,7 +10,7 @@
 
 namespace {
 
-const size_t kMaxMetadataFrameSize = 16384;
+const uint32_t kMaxAllowedMetadataFrameSize = 65536u;
 
 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
@@ -377,8 +377,9 @@
     return true;
   }
   bool source_can_produce = true;
-  int32_t available_window = std::min(
-      std::min(connection_send_window_, state.send_window), max_frame_payload_);
+  int32_t available_window =
+      std::min({connection_send_window_, state.send_window,
+                static_cast<int32_t>(max_frame_payload_)});
   while (connection_can_write && available_window > 0 &&
          state.outbound_body != nullptr) {
     int64_t length;
@@ -411,9 +412,8 @@
       visitor_.OnFrameSent(/* DATA */ 0, stream_id, length, fin ? 0x1 : 0x0, 0);
       connection_send_window_ -= length;
       state.send_window -= length;
-      available_window =
-          std::min(std::min(connection_send_window_, state.send_window),
-                   max_frame_payload_);
+      available_window = std::min({connection_send_window_, state.send_window,
+                                   static_cast<int32_t>(max_frame_payload_)});
     }
     if (end_data) {
       bool sent_trailers = false;
@@ -445,19 +445,21 @@
 
 bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
                                   OgHttp2Session::MetadataSequence& sequence) {
-  auto payload_buffer = absl::make_unique<uint8_t[]>(kMaxMetadataFrameSize);
+  const uint32_t max_payload_size =
+      std::min(kMaxAllowedMetadataFrameSize, max_frame_payload_);
+  auto payload_buffer = absl::make_unique<uint8_t[]>(max_payload_size);
   while (!sequence.empty()) {
     MetadataSource& source = *sequence.front();
 
     int64_t written;
     bool end_metadata;
     std::tie(written, end_metadata) =
-        source.Pack(payload_buffer.get(), kMaxMetadataFrameSize);
+        source.Pack(payload_buffer.get(), max_payload_size);
     if (written < 0) {
       // Did not touch the connection, so perhaps writes are still possible.
       return true;
     }
-    QUICHE_DCHECK_LE(static_cast<size_t>(written), kMaxMetadataFrameSize);
+    QUICHE_DCHECK_LE(static_cast<size_t>(written), max_payload_size);
     auto payload = absl::string_view(
         reinterpret_cast<const char*>(payload_buffer.get()), written);
     EnqueueFrame(absl::make_unique<spdy::SpdyUnknownIR>(
@@ -653,6 +655,8 @@
   visitor_.OnSetting({id, value});
   if (id == kMetadataExtensionId) {
     peer_supports_metadata_ = (value != 0);
+  } else if (id == MAX_FRAME_SIZE) {
+    max_frame_payload_ = value;
   }
 }
 
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 0d8461b..ccb0b68 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -269,10 +269,10 @@
   Http2StreamId highest_received_stream_id_ = 0;
   Http2StreamId metadata_stream_id_ = 0;
   size_t metadata_length_ = 0;
-  int connection_send_window_ = kInitialFlowControlWindowSize;
+  int32_t connection_send_window_ = kInitialFlowControlWindowSize;
   // The initial flow control receive window size for any newly created streams.
-  int stream_receive_window_limit_ = kInitialFlowControlWindowSize;
-  int max_frame_payload_ = 16384;
+  int32_t stream_receive_window_limit_ = kInitialFlowControlWindowSize;
+  uint32_t max_frame_payload_ = 16384u;
   Options options_;
   bool received_goaway_ = false;
   bool queued_preface_ = false;
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index 1b18890..0493788 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -96,6 +96,10 @@
  public:
   explicit TestMetadataSource(const spdy::SpdyHeaderBlock& entries);
 
+  size_t NumFrames(size_t max_frame_size) const override {
+    // Round up to the next frame.
+    return (encoded_entries_.size() + max_frame_size - 1) / max_frame_size;
+  }
   std::pair<int64_t, bool> Pack(uint8_t* dest, size_t dest_len) override;
 
  private: