Adds a MetadataSource abstraction and metadata write implementation for the oghttp2 stack.

Plumbing for the nghttp2 stack will come in a future change.

PiperOrigin-RevId: 388225769
diff --git a/http2/adapter/data_source.h b/http2/adapter/data_source.h
index 6b1a8ce..d867141 100644
--- a/http2/adapter/data_source.h
+++ b/http2/adapter/data_source.h
@@ -32,6 +32,18 @@
   virtual bool send_fin() const = 0;
 };
 
+// Represents a source of metadata frames for transmission to the peer.
+class QUICHE_EXPORT_PRIVATE MetadataSource {
+ public:
+  virtual ~MetadataSource() {}
+
+  // This method is called with a destination buffer and length. It should
+  // return the number of payload bytes copied to |dest|, or a negative integer
+  // to indicate an error, as well as a boolean indicating whether the metadata
+  // has been completely copied.
+  virtual std::pair<ssize_t, bool> Pack(uint8_t* dest, size_t dest_len) = 0;
+};
+
 }  // namespace adapter
 }  // namespace http2
 
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index 75eb138..789be98 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -62,10 +62,10 @@
   virtual void SubmitRst(Http2StreamId stream_id,
                          Http2ErrorCode error_code) = 0;
 
-  // Submits a METADATA frame for the given stream (a |stream_id| of 0 indicates
-  // connection-level METADATA). If |fin|, the frame will also have the
-  // END_METADATA flag set.
-  virtual void SubmitMetadata(Http2StreamId stream_id, bool fin) = 0;
+  // Submits a sequence of METADATA frames for the given stream. A |stream_id|
+  // of 0 indicates connection-level METADATA.
+  virtual void SubmitMetadata(Http2StreamId stream_id,
+                              std::unique_ptr<MetadataSource> source) = 0;
 
   // Invokes the visitor's OnReadyToSend() method for serialized frame data.
   // Returns 0 on success.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index b786daa..9b444a3 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -90,8 +90,8 @@
                                stream_id, window_increment);
 }
 
-void NgHttp2Adapter::SubmitMetadata(Http2StreamId /*stream_id*/,
-                                    bool /*end_metadata*/) {
+void NgHttp2Adapter::SubmitMetadata(
+    Http2StreamId /*stream_id*/, std::unique_ptr<MetadataSource> /*source*/) {
   QUICHE_LOG(DFATAL) << "Not implemented";
 }
 
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index 2f2cbad..151f9a7 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -49,7 +49,8 @@
 
   void SubmitRst(Http2StreamId stream_id, Http2ErrorCode error_code) override;
 
-  void SubmitMetadata(Http2StreamId stream_id, bool end_metadata) override;
+  void SubmitMetadata(Http2StreamId stream_id,
+                      std::unique_ptr<MetadataSource> source) override;
 
   int Send() override;
 
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index f695696..27f011e 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -75,8 +75,9 @@
       absl::make_unique<SpdyWindowUpdateIR>(stream_id, window_increment));
 }
 
-void OgHttp2Adapter::SubmitMetadata(Http2StreamId /*stream_id*/, bool /*fin*/) {
-  QUICHE_BUG(oghttp2_submit_metadata) << "Not implemented";
+void OgHttp2Adapter::SubmitMetadata(Http2StreamId stream_id,
+                                    std::unique_ptr<MetadataSource> source) {
+  session_->SubmitMetadata(stream_id, std::move(source));
 }
 
 int OgHttp2Adapter::Send() { return session_->Send(); }
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index ace8ea9..9c7bad3 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -34,7 +34,8 @@
                     absl::string_view opaque_data) override;
   void SubmitWindowUpdate(Http2StreamId stream_id,
                           int window_increment) override;
-  void SubmitMetadata(Http2StreamId stream_id, bool fin) override;
+  void SubmitMetadata(Http2StreamId stream_id,
+                      std::unique_ptr<MetadataSource> source) override;
   int Send() override;
   int GetSendWindowSize() const override;
   int GetStreamSendWindowSize(Http2StreamId stream_id) const override;
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 401fda4..9cc960f 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1,6 +1,7 @@
 #include "http2/adapter/oghttp2_adapter.h"
 
 #include "http2/adapter/mock_http2_visitor.h"
+#include "http2/adapter/oghttp2_util.h"
 #include "http2/adapter/test_frame_sequence.h"
 #include "http2/adapter/test_utils.h"
 #include "common/platform/api/quiche_test.h"
@@ -531,7 +532,43 @@
 }
 
 TEST_F(OgHttp2AdapterTest, SubmitMetadata) {
-  EXPECT_QUICHE_BUG(adapter_->SubmitMetadata(3, true), "Not implemented");
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter_->SubmitMetadata(1, std::move(source));
+  EXPECT_TRUE(adapter_->session().want_write());
+
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  int result = adapter_->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_THAT(
+      http2_visitor_.data(),
+      EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter_->session().want_write());
+}
+
+TEST_F(OgHttp2AdapterTest, SubmitConnectionMetadata) {
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter_->SubmitMetadata(0, std::move(source));
+  EXPECT_TRUE(adapter_->session().want_write());
+
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(http2_visitor_, OnBeforeFrameSent(kMetadataFrameType, 0, _, 0x4));
+  EXPECT_CALL(http2_visitor_, OnFrameSent(kMetadataFrameType, 0, _, 0x4, 0));
+
+  int result = adapter_->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_THAT(
+      http2_visitor_.data(),
+      EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                    static_cast<spdy::SpdyFrameType>(kMetadataFrameType)}));
+  EXPECT_FALSE(adapter_->session().want_write());
 }
 
 TEST_F(OgHttp2AdapterTest, GetSendWindowSize) {
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 6aa98b8..b43e4a8 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -10,6 +10,8 @@
 
 namespace {
 
+const size_t kMaxMetadataFrameSize = 16384;
+
 // TODO(birenroy): Consider incorporating spdy::FlagsSerializionVisitor here.
 class FrameAttributeCollector : public spdy::SpdyFrameVisitor {
  public:
@@ -80,6 +82,12 @@
     flags_ = continuation.end_headers() ? 0x4 : 0;
     length_ = continuation.size() - spdy::kFrameHeaderSize;
   }
+  void VisitUnknown(const spdy::SpdyUnknownIR& unknown) override {
+    frame_type_ = static_cast<uint8_t>(unknown.frame_type());
+    stream_id_ = unknown.stream_id();
+    flags_ = unknown.flags();
+    length_ = unknown.size() - spdy::kFrameHeaderSize;
+  }
   void VisitAltSvc(const spdy::SpdyAltSvcIR& /*altsvc*/) override {}
   void VisitPriorityUpdate(
       const spdy::SpdyPriorityUpdateIR& /*priority_update*/) override {}
@@ -287,6 +295,9 @@
     return result < 0 ? result : 0;
   }
   bool continue_writing = SendQueuedFrames();
+  while (continue_writing && !connection_metadata_.empty()) {
+    continue_writing = SendMetadata(0, connection_metadata_);
+  }
   // Wake streams for writes.
   while (continue_writing && write_scheduler_.HasReadyStreams() &&
          connection_send_window_ > 0) {
@@ -346,6 +357,11 @@
     return true;
   }
   StreamState& state = it->second;
+  bool connection_can_write = true;
+  if (!state.outbound_metadata.empty()) {
+    connection_can_write = SendMetadata(stream_id, state.outbound_metadata);
+  }
+
   if (state.outbound_body == nullptr) {
     // No data to send, but there might be trailers.
     if (state.trailers != nullptr) {
@@ -360,10 +376,10 @@
     return true;
   }
   bool source_can_produce = true;
-  bool connection_can_write = true;
   int32_t available_window = std::min(
       std::min(connection_send_window_, state.send_window), max_frame_payload_);
-  while (available_window > 0 && state.outbound_body != nullptr) {
+  while (connection_can_write && available_window > 0 &&
+         state.outbound_body != nullptr) {
     ssize_t length;
     bool end_data;
     std::tie(length, end_data) =
@@ -382,6 +398,7 @@
     data.SetDataShallow(length);
     spdy::SpdySerializedFrame header =
         spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(data);
+    QUICHE_DCHECK(serialized_prefix_.empty() && frames_.empty());
     const bool success =
         state.outbound_body->Send(absl::string_view(header), length);
     if (!success) {
@@ -422,6 +439,33 @@
   return connection_can_write && available_window > 0;
 }
 
+bool OgHttp2Session::SendMetadata(Http2StreamId stream_id,
+                                  OgHttp2Session::MetadataSequence& sequence) {
+  auto payload_buffer = absl::make_unique<uint8_t[]>(kMaxMetadataFrameSize);
+  while (!sequence.empty()) {
+    MetadataSource& source = *sequence.front();
+
+    ssize_t written;
+    bool end_metadata;
+    std::tie(written, end_metadata) =
+        source.Pack(payload_buffer.get(), kMaxMetadataFrameSize);
+    if (written < 0) {
+      // Did not touch the connection, so perhaps writes are still possible.
+      return true;
+    }
+    QUICHE_DCHECK_LE(static_cast<size_t>(written), kMaxMetadataFrameSize);
+    auto payload = absl::string_view(
+        reinterpret_cast<const char*>(payload_buffer.get()), written);
+    EnqueueFrame(absl::make_unique<spdy::SpdyUnknownIR>(
+        stream_id, kMetadataFrameType, end_metadata ? kMetadataEndFlag : 0u,
+        std::string(payload)));
+    if (end_metadata) {
+      sequence.erase(sequence.begin());
+    }
+  }
+  return SendQueuedFrames();
+}
+
 int32_t OgHttp2Session::SubmitRequest(
     absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source, void* user_data) {
@@ -507,6 +551,17 @@
   return 0;
 }
 
+void OgHttp2Session::SubmitMetadata(Http2StreamId stream_id,
+                                    std::unique_ptr<MetadataSource> source) {
+  if (stream_id == 0) {
+    connection_metadata_.push_back(std::move(source));
+  } else {
+    auto iter = CreateStream(stream_id);
+    iter->second.outbound_metadata.push_back(std::move(source));
+    write_scheduler_.MarkStreamReady(stream_id, false);
+  }
+}
+
 void OgHttp2Session::OnError(http2::Http2DecoderAdapter::SpdyFramerError error,
                              std::string detailed_error) {
   QUICHE_VLOG(1) << "Error: "
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 6bec0ca..5188828 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -45,6 +45,8 @@
   int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
                      std::unique_ptr<DataFrameSource> data_source);
   int SubmitTrailer(Http2StreamId stream_id, absl::Span<const Header> trailers);
+  void SubmitMetadata(Http2StreamId stream_id,
+                      std::unique_ptr<MetadataSource> source);
 
   bool IsServerSession() const {
     return options_.perspective == Perspective::kServer;
@@ -86,7 +88,7 @@
   bool want_read() const override { return !received_goaway_; }
   bool want_write() const override {
     return !frames_.empty() || !serialized_prefix_.empty() ||
-           write_scheduler_.HasReadyStreams();
+           write_scheduler_.HasReadyStreams() || !connection_metadata_.empty();
   }
   int GetRemoteWindowSize() const override { return connection_send_window_; }
 
@@ -151,6 +153,7 @@
                       Http2VisitorInterface::OnHeaderResult result);
 
  private:
+  using MetadataSequence = std::vector<std::unique_ptr<MetadataSource>>;
   struct QUICHE_EXPORT_PRIVATE StreamState {
     StreamState(int32_t stream_receive_window,
                 WindowManager::WindowUpdateListener listener)
@@ -158,6 +161,7 @@
 
     WindowManager window_manager;
     std::unique_ptr<DataFrameSource> outbound_body;
+    MetadataSequence outbound_metadata;
     std::unique_ptr<spdy::SpdyHeaderBlock> trailers;
     void* user_data = nullptr;
     int32_t send_window = kInitialFlowControlWindowSize;
@@ -202,6 +206,8 @@
   // some other reason).
   bool WriteForStream(Http2StreamId stream_id);
 
+  bool SendMetadata(Http2StreamId stream_id, MetadataSequence& sequence);
+
   void SendTrailers(Http2StreamId stream_id, spdy::SpdyHeaderBlock trailers);
 
   // Encapsulates the RST_STREAM NO_ERROR behavior described in RFC 7540
@@ -246,6 +252,8 @@
 
   absl::flat_hash_set<Http2StreamId> streams_reset_;
 
+  MetadataSequence connection_metadata_;
+
   Http2StreamId next_stream_id_ = 1;
   Http2StreamId highest_received_stream_id_ = 0;
   int connection_send_window_ = kInitialFlowControlWindowSize;
diff --git a/http2/adapter/test_utils.cc b/http2/adapter/test_utils.cc
index 8b3f4d6..9281215 100644
--- a/http2/adapter/test_utils.cc
+++ b/http2/adapter/test_utils.cc
@@ -1,6 +1,7 @@
 #include "http2/adapter/test_utils.h"
 
 #include "common/quiche_endian.h"
+#include "spdy/core/hpack/hpack_encoder.h"
 #include "spdy/core/spdy_frame_reader.h"
 
 namespace http2 {
@@ -85,6 +86,27 @@
   return true;
 }
 
+std::string EncodeHeaders(const spdy::SpdyHeaderBlock& entries) {
+  spdy::HpackEncoder encoder;
+  encoder.DisableCompression();
+  std::string result;
+  QUICHE_CHECK(encoder.EncodeHeaderSet(entries, &result));
+  return result;
+}
+
+TestMetadataSource::TestMetadataSource(const spdy::SpdyHeaderBlock& entries)
+    : encoded_entries_(EncodeHeaders(entries)) {
+  remaining_ = encoded_entries_;
+}
+
+std::pair<ssize_t, bool> TestMetadataSource::Pack(uint8_t* dest,
+                                                  size_t dest_len) {
+  const size_t copied = std::min(dest_len, remaining_.size());
+  std::memcpy(dest, remaining_.data(), copied);
+  remaining_.remove_prefix(copied);
+  return std::make_pair(copied, remaining_.empty());
+}
+
 namespace {
 
 using TypeAndOptionalLength =
@@ -103,6 +125,14 @@
   return out;
 }
 
+std::string FrameTypeToString(uint8_t frame_type) {
+  if (spdy::IsDefinedFrameType(frame_type)) {
+    return spdy::FrameTypeToString(spdy::ParseFrameType(frame_type));
+  } else {
+    return absl::StrFormat("0x%x", static_cast<int>(frame_type));
+  }
+}
+
 // Custom gMock matcher, used to implement EqualsFrames().
 class SpdyControlFrameMatcher
     : public testing::MatcherInterface<absl::string_view> {
@@ -153,16 +183,8 @@
       return false;
     }
 
-    if (!spdy::IsDefinedFrameType(raw_type)) {
-      *listener << "; expected type " << FrameTypeToString(expected_type)
-                << " but raw type " << static_cast<int>(raw_type)
-                << " is not a defined frame type!";
-      return false;
-    }
-
-    spdy::SpdyFrameType actual_type = spdy::ParseFrameType(raw_type);
-    if (actual_type != expected_type) {
-      *listener << "; actual type: " << FrameTypeToString(actual_type)
+    if (raw_type != static_cast<uint8_t>(expected_type)) {
+      *listener << "; actual type: " << FrameTypeToString(raw_type)
                 << " but expected type: " << FrameTypeToString(expected_type);
       return false;
     }
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index bec3fc3..ff52eee 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -10,6 +10,7 @@
 #include "http2/adapter/mock_http2_visitor.h"
 #include "common/platform/api/quiche_export.h"
 #include "common/platform/api/quiche_test.h"
+#include "spdy/core/spdy_header_block.h"
 #include "spdy/core/spdy_protocol.h"
 
 namespace http2 {
@@ -71,6 +72,17 @@
   bool is_data_available_ = true;
 };
 
+class QUICHE_NO_EXPORT TestMetadataSource : public MetadataSource {
+ public:
+  explicit TestMetadataSource(const spdy::SpdyHeaderBlock& entries);
+
+  std::pair<ssize_t, bool> Pack(uint8_t* dest, size_t dest_len) override;
+
+ private:
+  const std::string encoded_entries_;
+  absl::string_view remaining_;
+};
+
 // These matchers check whether a string consists entirely of HTTP/2 frames of
 // the specified ordered sequence. This is useful in tests where we want to show
 // that one or more particular frame types are serialized for sending to the