Plumbs metadata events through CallbackVisitor and nghttp2_callbacks.

Metadata handling is not implemented properly yet in the OG stack.

PiperOrigin-RevId: 381480503
diff --git a/http2/adapter/callback_visitor.cc b/http2/adapter/callback_visitor.cc
index a86e795..f1ff90a 100644
--- a/http2/adapter/callback_visitor.cc
+++ b/http2/adapter/callback_visitor.cc
@@ -299,16 +299,36 @@
 
 void CallbackVisitor::OnBeginMetadataForStream(Http2StreamId stream_id,
                                                size_t payload_length) {
-  QUICHE_LOG(FATAL) << "Not implemented";
+  QUICHE_VLOG(1) << "OnBeginMetadataForStream(stream_id=" << stream_id
+                 << ", payload_length=" << payload_length << ")";
 }
 
 void CallbackVisitor::OnMetadataForStream(Http2StreamId stream_id,
                                           absl::string_view metadata) {
-  QUICHE_LOG(FATAL) << "Not implemented";
+  QUICHE_VLOG(1) << "OnMetadataForStream(stream_id=" << stream_id
+                 << ", len=" << metadata.size() << ")";
+  if (callbacks_->on_extension_chunk_recv_callback) {
+    int result = callbacks_->on_extension_chunk_recv_callback(
+        nullptr, &current_frame_.hd, ToUint8Ptr(metadata.data()),
+        metadata.size(), user_data_);
+    QUICHE_DCHECK_EQ(0, result);
+  }
 }
 
-void CallbackVisitor::OnMetadataEndForStream(Http2StreamId stream_id) {
-  QUICHE_LOG(FATAL) << "Not implemented";
+bool CallbackVisitor::OnMetadataEndForStream(Http2StreamId stream_id) {
+  QUICHE_LOG_IF(DFATAL, current_frame_.hd.flags != kMetadataEndFlag);
+  QUICHE_VLOG(1) << "OnMetadataEndForStream(stream_id=" << stream_id << ")";
+  if (callbacks_->unpack_extension_callback) {
+    void* payload;
+    int result = callbacks_->unpack_extension_callback(
+        nullptr, &payload, &current_frame_.hd, user_data_);
+    if (callbacks_->on_frame_recv_callback) {
+      current_frame_.ext.payload = payload;
+      callbacks_->on_frame_recv_callback(nullptr, &current_frame_, user_data_);
+    }
+    return (result == 0);
+  }
+  return true;
 }
 
 void CallbackVisitor::OnErrorDebug(absl::string_view message) {
diff --git a/http2/adapter/callback_visitor.h b/http2/adapter/callback_visitor.h
index f00307d..49e3f33 100644
--- a/http2/adapter/callback_visitor.h
+++ b/http2/adapter/callback_visitor.h
@@ -71,7 +71,7 @@
                                 size_t payload_length) override;
   void OnMetadataForStream(Http2StreamId stream_id,
                            absl::string_view metadata) override;
-  void OnMetadataEndForStream(Http2StreamId stream_id) override;
+  bool OnMetadataEndForStream(Http2StreamId stream_id) override;
   void OnErrorDebug(absl::string_view message) override;
 
  private:
diff --git a/http2/adapter/http2_visitor_interface.h b/http2/adapter/http2_visitor_interface.h
index a707568..f71e21c 100644
--- a/http2/adapter/http2_visitor_interface.h
+++ b/http2/adapter/http2_visitor_interface.h
@@ -180,6 +180,7 @@
   // Called when the connection receives the beginning of a METADATA frame
   // (which may itself be the middle of a logical metadata block). The metadata
   // payload will be provided via subsequent calls to OnMetadataForStream().
+  // TODO(birenroy): Consider removing this unnecessary method.
   virtual void OnBeginMetadataForStream(Http2StreamId stream_id,
                                         size_t payload_length) = 0;
 
@@ -190,7 +191,8 @@
 
   // Called when the connection has finished receiving a logical metadata block
   // for a stream. Note that there may be multiple metadata blocks for a stream.
-  virtual void OnMetadataEndForStream(Http2StreamId stream_id) = 0;
+  // Returns false if there was an error unpacking the metadata payload.
+  virtual bool OnMetadataEndForStream(Http2StreamId stream_id) = 0;
 
   // Invoked with an error message from the application.
   virtual void OnErrorDebug(absl::string_view message) = 0;
diff --git a/http2/adapter/mock_http2_visitor.h b/http2/adapter/mock_http2_visitor.h
index b07f8e3..26c37b7 100644
--- a/http2/adapter/mock_http2_visitor.h
+++ b/http2/adapter/mock_http2_visitor.h
@@ -14,6 +14,7 @@
   MockHttp2Visitor() {
     ON_CALL(*this, OnHeaderForStream).WillByDefault(testing::Return(true));
     ON_CALL(*this, OnInvalidFrame).WillByDefault(testing::Return(true));
+    ON_CALL(*this, OnMetadataEndForStream).WillByDefault(testing::Return(true));
   }
 
   MOCK_METHOD(ssize_t,
@@ -132,9 +133,7 @@
               (Http2StreamId stream_id, absl::string_view metadata),
               (override));
 
-  MOCK_METHOD(void,
-              OnMetadataEndForStream,
-              (Http2StreamId stream_id),
+  MOCK_METHOD(bool, OnMetadataEndForStream, (Http2StreamId stream_id),
               (override));
 
   MOCK_METHOD(void, OnErrorDebug, (absl::string_view message), (override));
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index ad2d142..580009e 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -236,6 +236,8 @@
     nghttp2_option_set_no_auto_window_update(owned_options, 1);
     nghttp2_option_set_max_send_header_block_length(owned_options, 0x2000000);
     nghttp2_option_set_max_outbound_ack(owned_options, 10000);
+    nghttp2_option_set_user_recv_extension_type(owned_options,
+                                                kMetadataFrameType);
     options_ = owned_options;
   }
 
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 9e5308b..22057c7 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -334,6 +334,84 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
 }
 
+TEST(NgHttp2AdapterTest, ClientHandlesMetadata) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  testing::InSequence s;
+
+  const std::vector<const Header> headers1 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}});
+
+  const char* kSentinel1 = "arbitrary pointer 1";
+  const int32_t stream_id1 =
+      adapter->SubmitRequest(headers1, nullptr, const_cast<char*>(kSentinel1));
+  ASSERT_GT(stream_id1, 0);
+  QUICHE_LOG(INFO) << "Created stream: " << stream_id1;
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view data = visitor.data();
+  EXPECT_THAT(data, testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  data.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(data, EqualsFrames({spdy::SpdyFrameType::HEADERS}));
+  visitor.Clear();
+
+  const std::string stream_frames =
+      TestFrameSequence()
+          .ServerPreface()
+          .Metadata(0, "Example connection metadata")
+          .Headers(1,
+                   {{":status", "200"},
+                    {"server", "my-fake-server"},
+                    {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
+                   /*fin=*/false)
+          .Metadata(1, "Example stream metadata")
+          .Data(1, "This is the response body.", true)
+          .Serialize();
+
+  // Server preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, _, kMetadataFrameType, 4));
+  EXPECT_CALL(visitor, OnMetadataForStream(0, _));
+  EXPECT_CALL(visitor, OnMetadataEndForStream(0));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":status", "200"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, "server", "my-fake-server"));
+  EXPECT_CALL(visitor,
+              OnHeaderForStream(1, "date", "Tue, 6 Apr 2021 12:54:01 GMT"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, kMetadataFrameType, 4));
+  EXPECT_CALL(visitor, OnMetadataForStream(1, _));
+  EXPECT_CALL(visitor, OnMetadataEndForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 26, DATA, 1));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, 26));
+  EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
+
+  const ssize_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), stream_result);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+
+  EXPECT_TRUE(adapter->session().want_write());
+  result = adapter->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
+}
+
 TEST(NgHttp2AdapterTest, ClientHandlesInvalidTrailers) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
diff --git a/http2/adapter/nghttp2_callbacks.cc b/http2/adapter/nghttp2_callbacks.cc
index fd7b786..2ca9b39 100644
--- a/http2/adapter/nghttp2_callbacks.cc
+++ b/http2/adapter/nghttp2_callbacks.cc
@@ -221,6 +221,44 @@
   return 0;
 }
 
+int OnExtensionChunkReceived(nghttp2_session* session,
+                             const nghttp2_frame_hd* hd, const uint8_t* data,
+                             size_t len, void* user_data) {
+  QUICHE_CHECK_NE(user_data, nullptr);
+  auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
+  if (hd->type != kMetadataFrameType) {
+    QUICHE_LOG(ERROR) << "Unexpected frame type: "
+                      << static_cast<int>(hd->type);
+    return NGHTTP2_ERR_CANCEL;
+  }
+  visitor->OnMetadataForStream(hd->stream_id, ToStringView(data, len));
+  return 0;
+}
+
+int OnUnpackExtensionCallback(nghttp2_session* session, void** payload,
+                              const nghttp2_frame_hd* hd, void* user_data) {
+  QUICHE_CHECK_NE(user_data, nullptr);
+  auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
+  if (hd->flags == kMetadataEndFlag) {
+    const bool result = visitor->OnMetadataEndForStream(hd->stream_id);
+    if (!result) {
+      return NGHTTP2_ERR_CALLBACK_FAILURE;
+    }
+  }
+  return 0;
+}
+
+ssize_t OnPackExtensionCallback(nghttp2_session* session, uint8_t* buf,
+                                size_t len, const nghttp2_frame* frame,
+                                void* user_data) {
+  QUICHE_CHECK_NE(user_data, nullptr);
+  auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
+  ssize_t written = 0;
+  visitor->OnReadyToSendMetadataForStream(
+      frame->hd.stream_id, reinterpret_cast<char*>(buf), len, &written);
+  return written;
+}
+
 int OnError(nghttp2_session* session, int lib_error_code, const char* msg,
             size_t len, void* user_data) {
   QUICHE_CHECK_NE(user_data, nullptr);
@@ -251,9 +289,15 @@
   nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(
       callbacks, &OnInvalidFrameReceived);
   nghttp2_session_callbacks_set_error_callback2(callbacks, &OnError);
+  // on_frame_not_send_callback <- just ignored
   nghttp2_session_callbacks_set_send_data_callback(
       callbacks, &DataFrameSourceSendCallback);
-
+  nghttp2_session_callbacks_set_pack_extension_callback(
+      callbacks, &OnPackExtensionCallback);
+  nghttp2_session_callbacks_set_unpack_extension_callback(
+      callbacks, &OnUnpackExtensionCallback);
+  nghttp2_session_callbacks_set_on_extension_chunk_recv_callback(
+      callbacks, &OnExtensionChunkReceived);
   return MakeCallbacksPtr(callbacks);
 }
 
diff --git a/http2/adapter/nghttp2_callbacks.h b/http2/adapter/nghttp2_callbacks.h
index 37b4536..696b684 100644
--- a/http2/adapter/nghttp2_callbacks.h
+++ b/http2/adapter/nghttp2_callbacks.h
@@ -59,6 +59,22 @@
 int OnStreamClosed(nghttp2_session* session, Http2StreamId stream_id,
                    uint32_t error_code, void* user_data);
 
+// Invoked when nghttp2 has a chunk of extension frame data to pass to the
+// application.
+int OnExtensionChunkReceived(nghttp2_session* session,
+                             const nghttp2_frame_hd* hd, const uint8_t* data,
+                             size_t len, void* user_data);
+
+// Invoked when nghttp2 wants the application to unpack an extension payload.
+int OnUnpackExtensionCallback(nghttp2_session* session, void** payload,
+                              const nghttp2_frame_hd* hd, void* user_data);
+
+// Invoked when nghttp2 is ready to pack an extension payload. Returns the
+// number of bytes serialized to |buf|.
+ssize_t OnPackExtensionCallback(nghttp2_session* session, uint8_t* buf,
+                                size_t len, const nghttp2_frame* frame,
+                                void* user_data);
+
 // Invoked when the library has an error message to deliver.
 int OnError(nghttp2_session* session, int lib_error_code, const char* msg,
             size_t len, void* user_data);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 40387cc..943bae7 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -133,6 +133,88 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
 }
 
+TEST(OgHttp2AdapterClientTest, ClientHandlesMetadata) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  testing::InSequence s;
+
+  const std::vector<const Header> headers1 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}});
+
+  const char* kSentinel1 = "arbitrary pointer 1";
+  const int32_t stream_id1 =
+      adapter->SubmitRequest(headers1, nullptr, const_cast<char*>(kSentinel1));
+  ASSERT_GT(stream_id1, 0);
+  QUICHE_LOG(INFO) << "Created stream: " << stream_id1;
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  absl::string_view data = visitor.data();
+  EXPECT_THAT(data, testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  data.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(data, EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                  spdy::SpdyFrameType::HEADERS}));
+  visitor.Clear();
+
+  const std::string stream_frames =
+      TestFrameSequence()
+          .ServerPreface()
+          .Metadata(0, "Example connection metadata")
+          .Headers(1,
+                   {{":status", "200"},
+                    {"server", "my-fake-server"},
+                    {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
+                   /*fin=*/false)
+          .Metadata(1, "Example stream metadata")
+          .Data(1, "This is the response body.", true)
+          .Serialize();
+
+  // Server preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, _, kMetadataFrameType, 4));
+  // EXPECT_CALL(visitor, OnMetadataForStream(0, _));
+  // EXPECT_CALL(visitor, OnMetadataEndForStream(0));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":status", "200"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, "server", "my-fake-server"));
+  EXPECT_CALL(visitor,
+              OnHeaderForStream(1, "date", "Tue, 6 Apr 2021 12:54:01 GMT"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, kMetadataFrameType, 4));
+  // EXPECT_CALL(visitor, OnMetadataForStream(1, _));
+  // EXPECT_CALL(visitor, OnMetadataEndForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 26, DATA, 1));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, 26));
+  EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
+
+  const ssize_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), stream_result);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+
+  EXPECT_TRUE(adapter->session().want_write());
+  result = adapter->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
+}
+
 // TODO(birenroy): Validate headers and re-enable this test. The library should
 // invoke OnErrorDebug() with an error message for the invalid header. The
 // library should also invoke OnInvalidFrame() for the invalid HEADERS frame.
diff --git a/http2/adapter/recording_http2_visitor.cc b/http2/adapter/recording_http2_visitor.cc
index 1ccc68e..48afbe4 100644
--- a/http2/adapter/recording_http2_visitor.cc
+++ b/http2/adapter/recording_http2_visitor.cc
@@ -175,8 +175,9 @@
       absl::StrFormat("OnMetadataForStream %d %s", stream_id, metadata));
 }
 
-void RecordingHttp2Visitor::OnMetadataEndForStream(Http2StreamId stream_id) {
+bool RecordingHttp2Visitor::OnMetadataEndForStream(Http2StreamId stream_id) {
   events_.push_back(absl::StrFormat("OnMetadataEndForStream %d", stream_id));
+  return true;
 }
 
 void RecordingHttp2Visitor::OnErrorDebug(absl::string_view message) {
diff --git a/http2/adapter/recording_http2_visitor.h b/http2/adapter/recording_http2_visitor.h
index e3e40e1..aea7f9d 100644
--- a/http2/adapter/recording_http2_visitor.h
+++ b/http2/adapter/recording_http2_visitor.h
@@ -69,7 +69,7 @@
                                 size_t payload_length) override;
   void OnMetadataForStream(Http2StreamId stream_id,
                            absl::string_view metadata) override;
-  void OnMetadataEndForStream(Http2StreamId stream_id) override;
+  bool OnMetadataEndForStream(Http2StreamId stream_id) override;
   void OnErrorDebug(absl::string_view message) override;
 
   const EventSequence& GetEventSequence() const { return events_; }