Introduces new `Http2VisitorInterface` methods for DATA frame generation.

An outline of changes:
* Adds `Http2VisitorInterface::OnReadyToSendDataForStream()`, which the codec calls when it is ready to send a `DATA` frame; the visitor implementation indicates how many bytes can be sent in this frame.
* Adds `Http2VisitorInterface::SendDataFrame()`, which the codec calls with a frame header; the visitor implementation should send the frame header and then the indicated number of payload bytes.
* Replaces the existing "data provider" implementation with one that calls back through `NgHttp2Adapter`. This allows the adapter to forward the read callback to a `DataFrameSource`, if present for the stream, or to the visitor implementation otherwise.

This change brings the QUICHE `oghttp2` API more in line with its predecessor. Library users can continue to use the old behavior for now by passing a `DataFrameSource` to `SubmitRequest()`/`SubmitResponse()`.

At some point in the future, the `DataFrameSource` parameter will be removed, and all library users will have to use the new behavior.

PiperOrigin-RevId: 634047735
diff --git a/quiche/http2/adapter/callback_visitor.cc b/quiche/http2/adapter/callback_visitor.cc
index 7f2f4a4..f03ee40 100644
--- a/quiche/http2/adapter/callback_visitor.cc
+++ b/quiche/http2/adapter/callback_visitor.cc
@@ -107,6 +107,22 @@
   }
 }
 
+Http2VisitorInterface::DataFrameHeaderInfo
+CallbackVisitor::OnReadyToSendDataForStream(Http2StreamId /*stream_id*/,
+                                            size_t /*max_length*/) {
+  QUICHE_LOG(FATAL)
+      << "Not implemented; should not be used with nghttp2 callbacks.";
+  return {};
+}
+
+bool CallbackVisitor::SendDataFrame(Http2StreamId /*stream_id*/,
+                                    absl::string_view /*frame_header*/,
+                                    size_t /*payload_bytes*/) {
+  QUICHE_LOG(FATAL)
+      << "Not implemented; should not be used with nghttp2 callbacks.";
+  return false;
+}
+
 void CallbackVisitor::OnConnectionError(ConnectionError /*error*/) {
   QUICHE_VLOG(1) << "OnConnectionError not implemented";
 }
diff --git a/quiche/http2/adapter/callback_visitor.h b/quiche/http2/adapter/callback_visitor.h
index 2f2336b..f1d7a3a 100644
--- a/quiche/http2/adapter/callback_visitor.h
+++ b/quiche/http2/adapter/callback_visitor.h
@@ -29,6 +29,10 @@
                            void* user_data);
 
   int64_t OnReadyToSend(absl::string_view serialized) override;
+  DataFrameHeaderInfo OnReadyToSendDataForStream(Http2StreamId stream_id,
+                                                 size_t max_length) override;
+  bool SendDataFrame(Http2StreamId stream_id, absl::string_view frame_header,
+                     size_t payload_bytes) override;
   void OnConnectionError(ConnectionError error) override;
   bool OnFrameHeader(Http2StreamId stream_id, size_t length, uint8_t type,
                      uint8_t flags) override;
diff --git a/quiche/http2/adapter/http2_visitor_interface.h b/quiche/http2/adapter/http2_visitor_interface.h
index fa8491d..531c0af 100644
--- a/quiche/http2/adapter/http2_visitor_interface.h
+++ b/quiche/http2/adapter/http2_visitor_interface.h
@@ -7,6 +7,7 @@
 #include "absl/strings/string_view.h"
 #include "quiche/http2/adapter/http2_protocol.h"
 #include "quiche/common/platform/api/quiche_export.h"
+#include "quiche/common/platform/api/quiche_logging.h"
 
 namespace http2 {
 namespace adapter {
@@ -60,6 +61,34 @@
   // bytes were actually sent. May return kSendBlocked or kSendError.
   virtual int64_t OnReadyToSend(absl::string_view serialized) = 0;
 
+  struct DataFrameHeaderInfo {
+    int64_t payload_length;
+    bool end_data;
+    bool end_stream;  // If true, also implies end_data.
+  };
+  // Called when the codec is ready to construct a DATA frame header. The
+  // implementation should return the number of bytes ready to send, and whether
+  // it's the end of the message body data. If the implementation returns 0
+  // bytes and also `end_data` is false, then the stream is deferred until
+  // resumed by the application. `payload_length` must be at most `max_length`.
+  // A `payload_length` of -1 indicates that this stream has encountered an
+  // unrecoverable error.
+  virtual DataFrameHeaderInfo OnReadyToSendDataForStream(
+      Http2StreamId /*stream_id*/, size_t /*max_length*/) {
+    QUICHE_LOG(FATAL) << "Not implemented";
+    return DataFrameHeaderInfo{};
+  }
+
+  // Called when the codec is ready to send a DATA frame. The implementation
+  // should send the `frame_header` and specified number of payload bytes.
+  // Returning false indicates an unrecoverable error.
+  virtual bool SendDataFrame(Http2StreamId /*stream_id*/,
+                             absl::string_view /*frame_header*/,
+                             size_t /*payload_bytes*/) {
+    QUICHE_LOG(FATAL) << "Not implemented.";
+    return false;
+  }
+
   // Called when a connection-level error has occurred.
   enum class ConnectionError {
     // The peer sent an invalid connection preface.
diff --git a/quiche/http2/adapter/mock_http2_visitor.h b/quiche/http2/adapter/mock_http2_visitor.h
index 86345d3..40d3c80 100644
--- a/quiche/http2/adapter/mock_http2_visitor.h
+++ b/quiche/http2/adapter/mock_http2_visitor.h
@@ -33,6 +33,12 @@
 
   MOCK_METHOD(int64_t, OnReadyToSend, (absl::string_view serialized),
               (override));
+  MOCK_METHOD(DataFrameHeaderInfo, OnReadyToSendDataForStream,
+              (Http2StreamId stream_id, size_t max_length), (override));
+  MOCK_METHOD(bool, SendDataFrame,
+              (Http2StreamId stream_id, absl::string_view frame_header,
+               size_t payload_bytes),
+              (override));
   MOCK_METHOD(void, OnConnectionError, (ConnectionError error), (override));
   MOCK_METHOD(bool, OnFrameHeader,
               (Http2StreamId stream_id, size_t length, uint8_t type,
diff --git a/quiche/http2/adapter/nghttp2_adapter.cc b/quiche/http2/adapter/nghttp2_adapter.cc
index cdea62b..23593ce 100644
--- a/quiche/http2/adapter/nghttp2_adapter.cc
+++ b/quiche/http2/adapter/nghttp2_adapter.cc
@@ -19,6 +19,25 @@
 
 using ConnectionError = Http2VisitorInterface::ConnectionError;
 
+const size_t kFrameHeaderSize = 9;
+
+// A nghttp2-style `nghttp2_data_source_read_callback`.
+ssize_t DataFrameReadCallback(nghttp2_session* /* session */, int32_t stream_id,
+                              uint8_t* /* buf */, size_t length,
+                              uint32_t* data_flags, nghttp2_data_source* source,
+                              void* /* user_data */) {
+  NgHttp2Adapter* adapter = reinterpret_cast<NgHttp2Adapter*>(source->ptr);
+  return adapter->DelegateReadCallback(stream_id, length, data_flags);
+}
+
+// A nghttp2-style `nghttp2_send_data_callback`.
+int DataFrameSendCallback(nghttp2_session* /* session */, nghttp2_frame* frame,
+                          const uint8_t* framehd, size_t length,
+                          nghttp2_data_source* source, void* /* user_data */) {
+  NgHttp2Adapter* adapter = reinterpret_cast<NgHttp2Adapter*>(source->ptr);
+  return adapter->DelegateSendCallback(frame->hd.stream_id, framehd, length);
+}
+
 }  // anonymous namespace
 
 // A metadata source that notifies the owning NgHttp2Adapter upon completion or
@@ -225,10 +244,14 @@
     absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source, bool end_stream,
     void* stream_user_data) {
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
   auto nvs = GetNghttp2Nvs(headers);
-  std::unique_ptr<nghttp2_data_provider> provider =
-      MakeDataProvider(data_source.get());
+  std::unique_ptr<nghttp2_data_provider> provider;
+
+  if (data_source != nullptr || !end_stream) {
+    provider = std::make_unique<nghttp2_data_provider>();
+    provider->source.ptr = this;
+    provider->read_callback = &DataFrameReadCallback;
+  }
 
   int32_t stream_id =
       nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
@@ -246,11 +269,13 @@
                                    absl::Span<const Header> headers,
                                    std::unique_ptr<DataFrameSource> data_source,
                                    bool end_stream) {
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
   auto nvs = GetNghttp2Nvs(headers);
-  std::unique_ptr<nghttp2_data_provider> provider =
-      MakeDataProvider(data_source.get());
-
+  std::unique_ptr<nghttp2_data_provider> provider;
+  if (data_source != nullptr || !end_stream) {
+    provider = std::make_unique<nghttp2_data_provider>();
+    provider->source.ptr = this;
+    provider->read_callback = &DataFrameReadCallback;
+  }
   if (data_source != nullptr) {
     sources_.emplace(stream_id, std::move(data_source));
   }
@@ -296,6 +321,38 @@
   sources_.erase(stream_id);
 }
 
+ssize_t NgHttp2Adapter::DelegateReadCallback(int32_t stream_id,
+                                             size_t max_length,
+                                             uint32_t* data_flags) {
+  auto it = sources_.find(stream_id);
+  if (it == sources_.end()) {
+    // A DataFrameSource is not available for this stream; forward to the
+    // visitor.
+    return callbacks::VisitorReadCallback(visitor_, stream_id, max_length,
+                                          data_flags);
+  } else {
+    // A DataFrameSource is available for this stream.
+    return callbacks::DataFrameSourceReadCallback(*it->second, max_length,
+                                                  data_flags);
+  }
+}
+
+int NgHttp2Adapter::DelegateSendCallback(int32_t stream_id,
+                                         const uint8_t* framehd,
+                                         size_t length) {
+  auto it = sources_.find(stream_id);
+  if (it == sources_.end()) {
+    // A DataFrameSource is not available for this stream; forward to the
+    // visitor.
+    visitor_.SendDataFrame(stream_id, ToStringView(framehd, kFrameHeaderSize),
+                           length);
+  } else {
+    // A DataFrameSource is available for this stream.
+    it->second->Send(ToStringView(framehd, kFrameHeaderSize), length);
+  }
+  return 0;
+}
+
 NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor,
                                Perspective perspective,
                                const nghttp2_option* options)
@@ -320,9 +377,9 @@
     options_ = owned_options;
   }
 
-  session_ =
-      std::make_unique<NgHttp2Session>(perspective_, callbacks::Create(),
-                                       options_, static_cast<void*>(&visitor_));
+  session_ = std::make_unique<NgHttp2Session>(
+      perspective_, callbacks::Create(&DataFrameSendCallback), options_,
+      static_cast<void*>(&visitor_));
   if (owned_options != nullptr) {
     nghttp2_option_del(owned_options);
   }
diff --git a/quiche/http2/adapter/nghttp2_adapter.h b/quiche/http2/adapter/nghttp2_adapter.h
index 6f4b405..8fa54e4 100644
--- a/quiche/http2/adapter/nghttp2_adapter.h
+++ b/quiche/http2/adapter/nghttp2_adapter.h
@@ -105,6 +105,16 @@
     return 0;
   }
 
+  // Delegates a DATA frame read callback to either the visitor or a registered
+  // DataFrameSource.
+  ssize_t DelegateReadCallback(int32_t stream_id, size_t max_length,
+                               uint32_t* data_flags);
+
+  // Delegates a DATA frame send callback to either the visitor or a registered
+  // DataFrameSource.
+  int DelegateSendCallback(int32_t stream_id, const uint8_t* framehd,
+                           size_t length);
+
  private:
   class NotifyingMetadataSource;
 
diff --git a/quiche/http2/adapter/nghttp2_adapter_test.cc b/quiche/http2/adapter/nghttp2_adapter_test.cc
index c621176..c03fc1b 100644
--- a/quiche/http2/adapter/nghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/nghttp2_adapter_test.cc
@@ -876,9 +876,12 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::SETTINGS}));
 }
 
-using NgHttp2AdapterDataTest = quiche::test::QuicheTest;
+class NgHttp2AdapterDataTest : public quiche::test::QuicheTestWithParam<bool> {
+};
 
-TEST_F(NgHttp2AdapterDataTest, ClientSendsTrailers) {
+INSTANTIATE_TEST_SUITE_P(BothValues, NgHttp2AdapterDataTest, testing::Bool());
+
+TEST_P(NgHttp2AdapterDataTest, ClientSendsTrailers) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
 
@@ -898,11 +901,11 @@
   // nghttp2 does not require that the data source indicate the end of data
   // before trailers are enqueued.
 
-  const int32_t stream_id1 =
-      adapter->SubmitRequest(headers1, std::move(body1), false, nullptr);
+  const int32_t stream_id1 = adapter->SubmitRequest(
+      headers1, GetParam() ? nullptr : std::move(body1), false, nullptr);
   ASSERT_GT(stream_id1, 0);
   EXPECT_EQ(stream_id1, kStreamId);
-  EXPECT_EQ(adapter->sources_size(), 1);
+  EXPECT_EQ(adapter->sources_size(), GetParam() ? 0 : 1);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
   EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
@@ -2046,7 +2049,7 @@
                                             SpdyFrameType::RST_STREAM}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ClientSubmitRequest) {
+TEST_P(NgHttp2AdapterDataTest, ClientSubmitRequest) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
   int result = adapter->Send();
@@ -2085,12 +2088,13 @@
   visitor.AppendPayloadForStream(1, kBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  int stream_id = adapter->SubmitRequest(
-      ToHeaders({{":method", "POST"},
-                 {":scheme", "http"},
-                 {":authority", "example.com"},
-                 {":path", "/this/is/request/one"}}),
-      std::move(body1), false, const_cast<char*>(kSentinel));
+  int stream_id =
+      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
+                                        {":scheme", "http"},
+                                        {":authority", "example.com"},
+                                        {":path", "/this/is/request/one"}}),
+                             GetParam() ? nullptr : std::move(body1), false,
+                             const_cast<char*>(kSentinel));
   ASSERT_EQ(1, stream_id);
   EXPECT_TRUE(adapter->want_write());
 
@@ -2896,7 +2900,7 @@
                           }));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ClientObeysMaxConcurrentStreams) {
+TEST_P(NgHttp2AdapterDataTest, ClientObeysMaxConcurrentStreams) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
   int result = adapter->Send();
@@ -2936,12 +2940,12 @@
   visitor.AppendPayloadForStream(1, kBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -3013,7 +3017,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(NgHttp2AdapterDataTest, ClientReceivesInitialWindowSetting) {
+TEST_P(NgHttp2AdapterDataTest, ClientReceivesInitialWindowSetting) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
 
@@ -3052,12 +3056,12 @@
   visitor.AppendPayloadForStream(1, kLongBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, true);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -3075,7 +3079,7 @@
                             SpdyFrameType::DATA, SpdyFrameType::DATA}));
 }
 
-TEST_F(NgHttp2AdapterDataTest,
+TEST_P(NgHttp2AdapterDataTest,
        ClientReceivesInitialWindowSettingAfterStreamStart) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
@@ -3106,12 +3110,12 @@
   visitor.AppendPayloadForStream(1, kLongBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -3681,7 +3685,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(NgHttp2AdapterDataTest, ConnectionErrorOnDataFrameSent) {
+TEST_P(NgHttp2AdapterDataTest, ConnectionErrorOnDataFrameSent) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -3713,8 +3717,9 @@
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
   visitor.AppendPayloadForStream(
       1, "Here is some data, which will lead to a fatal error");
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_TRUE(adapter->want_write());
@@ -4083,7 +4088,7 @@
 
 // Tests the case where the response body is in the progress of being sent while
 // trailers are queued.
-TEST_F(NgHttp2AdapterDataTest, ServerSubmitsTrailersWhileDataDeferred) {
+TEST_P(NgHttp2AdapterDataTest, ServerSubmitsTrailersWhileDataDeferred) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -4138,7 +4143,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -4182,7 +4187,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSubmitsTrailersWithDataEndStream) {
+TEST_P(NgHttp2AdapterDataTest, ServerSubmitsTrailersWithDataEndStream) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -4223,8 +4228,9 @@
   visitor.SetEndData(1, true);
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(submit_result, 0);
 
   const std::vector<Header> trailers =
@@ -4250,7 +4256,7 @@
                             SpdyFrameType::HEADERS}));
 }
 
-TEST_F(NgHttp2AdapterDataTest,
+TEST_P(NgHttp2AdapterDataTest,
        ServerSubmitsTrailersWithDataEndStreamAndDeferral) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
@@ -4292,8 +4298,9 @@
   visitor.AppendPayloadForStream(1, kBody);
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(submit_result, 0);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, ACK_FLAG));
@@ -5221,7 +5228,7 @@
               EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSubmitResponse) {
+TEST_P(NgHttp2AdapterDataTest, ServerSubmitResponse) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5285,7 +5292,7 @@
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -5316,7 +5323,7 @@
   EXPECT_GT(adapter->GetHpackEncoderDynamicTableSize(), 0);
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSubmitResponseWithResetFromClient) {
+TEST_P(NgHttp2AdapterDataTest, ServerSubmitResponseWithResetFromClient) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5367,10 +5374,10 @@
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
-  EXPECT_EQ(adapter->sources_size(), 1);
+  EXPECT_EQ(adapter->sources_size(), GetParam() ? 0 : 1);
 
   // Client resets the stream before the server can send the response.
   const std::string reset =
@@ -5448,7 +5455,7 @@
               EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::GOAWAY}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSendsTrailers) {
+TEST_P(NgHttp2AdapterDataTest, ServerSendsTrailers) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5502,7 +5509,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -5631,7 +5638,7 @@
             absl::StrJoin(visitor.GetMetadata(1), ""));
 }
 
-TEST_F(NgHttp2AdapterDataTest, RepeatedHeaderNames) {
+TEST_P(NgHttp2AdapterDataTest, RepeatedHeaderNames) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5675,8 +5682,8 @@
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result =
-      adapter->SubmitResponse(1, headers1, std::move(body1), false);
+  int submit_result = adapter->SubmitResponse(
+      1, headers1, GetParam() ? nullptr : std::move(body1), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
@@ -5693,7 +5700,7 @@
                             SpdyFrameType::DATA}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerRespondsToRequestWithTrailers) {
+TEST_P(NgHttp2AdapterDataTest, ServerRespondsToRequestWithTrailers) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5732,8 +5739,8 @@
   const std::vector<Header> headers1 = ToHeaders({{":status", "200"}});
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result =
-      adapter->SubmitResponse(1, headers1, std::move(body1), false);
+  int submit_result = adapter->SubmitResponse(
+      1, headers1, GetParam() ? nullptr : std::move(body1), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
@@ -5774,7 +5781,7 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSubmitsResponseWithDataSourceError) {
+TEST_P(NgHttp2AdapterDataTest, ServerSubmitsResponseWithDataSourceError) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -5812,7 +5819,7 @@
 
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -6005,7 +6012,7 @@
   EXPECT_EQ(frames.size(), static_cast<size_t>(result));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerSendsInvalidTrailers) {
+TEST_P(NgHttp2AdapterDataTest, ServerSendsInvalidTrailers) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
   EXPECT_FALSE(adapter->want_write());
@@ -6047,7 +6054,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -6860,7 +6867,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(NgHttp2AdapterDataTest, SkipsSendingFramesForRejectedStream) {
+TEST_P(NgHttp2AdapterDataTest, SkipsSendingFramesForRejectedStream) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -6894,8 +6901,9 @@
       1, "Here is some data, which will be completely ignored!");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   auto source = std::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
@@ -6929,7 +6937,7 @@
                             SpdyFrameType::RST_STREAM}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerQueuesMetadataWithStreamReset) {
+TEST_P(NgHttp2AdapterDataTest, ServerQueuesMetadataWithStreamReset) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -6962,8 +6970,9 @@
       1, "Here is some data, which will be completely ignored!");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   auto source = std::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
@@ -7076,7 +7085,7 @@
   EXPECT_EQ(static_cast<size_t>(next_result), next_frame.size());
 }
 
-TEST_F(NgHttp2AdapterDataTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
+TEST_P(NgHttp2AdapterDataTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -7111,8 +7120,9 @@
   // Submit a response for the stream.
   visitor.AppendPayloadForStream(1, "This data is doomed to never be written.");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   // Submit a WINDOW_UPDATE frame.
@@ -7758,7 +7768,7 @@
                                             SpdyFrameType::WINDOW_UPDATE}));
 }
 
-TEST_F(NgHttp2AdapterDataTest, NegativeFlowControlStreamResumption) {
+TEST_P(NgHttp2AdapterDataTest, NegativeFlowControlStreamResumption) {
   TestVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
 
@@ -7799,8 +7809,9 @@
   // Submit a response for the stream.
   visitor.AppendPayloadForStream(1, std::string(70000, 'a'));
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
diff --git a/quiche/http2/adapter/nghttp2_callbacks.cc b/quiche/http2/adapter/nghttp2_callbacks.cc
index 200a709..6a93e43 100644
--- a/quiche/http2/adapter/nghttp2_callbacks.cc
+++ b/quiche/http2/adapter/nghttp2_callbacks.cc
@@ -348,7 +348,8 @@
   return 0;
 }
 
-nghttp2_session_callbacks_unique_ptr Create() {
+nghttp2_session_callbacks_unique_ptr Create(
+    nghttp2_send_data_callback send_data_callback) {
   nghttp2_session_callbacks* callbacks;
   nghttp2_session_callbacks_new(&callbacks);
 
@@ -372,8 +373,8 @@
   nghttp2_session_callbacks_set_on_invalid_frame_recv_callback(
       callbacks, &OnInvalidFrameReceived);
   nghttp2_session_callbacks_set_error_callback2(callbacks, &OnError);
-  nghttp2_session_callbacks_set_send_data_callback(
-      callbacks, &DataFrameSourceSendCallback);
+  nghttp2_session_callbacks_set_send_data_callback(callbacks,
+                                                   send_data_callback);
   nghttp2_session_callbacks_set_pack_extension_callback(
       callbacks, &OnPackExtensionCallback);
   nghttp2_session_callbacks_set_unpack_extension_callback(
diff --git a/quiche/http2/adapter/nghttp2_callbacks.h b/quiche/http2/adapter/nghttp2_callbacks.h
index dfdff06..367db37 100644
--- a/quiche/http2/adapter/nghttp2_callbacks.h
+++ b/quiche/http2/adapter/nghttp2_callbacks.h
@@ -81,7 +81,8 @@
 int OnError(nghttp2_session* session, int lib_error_code, const char* msg,
             size_t len, void* user_data);
 
-nghttp2_session_callbacks_unique_ptr Create();
+nghttp2_session_callbacks_unique_ptr Create(
+    nghttp2_send_data_callback send_data_callback);
 
 }  // namespace callbacks
 }  // namespace adapter
diff --git a/quiche/http2/adapter/nghttp2_data_provider.cc b/quiche/http2/adapter/nghttp2_data_provider.cc
index c0d76d1..3475210 100644
--- a/quiche/http2/adapter/nghttp2_data_provider.cc
+++ b/quiche/http2/adapter/nghttp2_data_provider.cc
@@ -9,18 +9,29 @@
 namespace adapter {
 namespace callbacks {
 
-namespace {
-const size_t kFrameHeaderSize = 9;
+ssize_t VisitorReadCallback(Http2VisitorInterface& visitor, int32_t stream_id,
+                            size_t max_length, uint32_t* data_flags) {
+  *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
+  auto [payload_length, end_data, end_stream] =
+      visitor.OnReadyToSendDataForStream(stream_id, max_length);
+  if (payload_length == 0 && !end_data) {
+    return NGHTTP2_ERR_DEFERRED;
+  } else if (payload_length == DataFrameSource::kError) {
+    return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+  }
+  if (end_data) {
+    *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+  }
+  if (!end_stream) {
+    *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
+  }
+  return payload_length;
 }
 
-ssize_t DataFrameSourceReadCallback(nghttp2_session* /* session */,
-                                    int32_t /* stream_id */, uint8_t* /* buf */,
-                                    size_t length, uint32_t* data_flags,
-                                    nghttp2_data_source* source,
-                                    void* /* user_data */) {
+ssize_t DataFrameSourceReadCallback(DataFrameSource& source, size_t length,
+                                    uint32_t* data_flags) {
   *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
-  auto* frame_source = static_cast<DataFrameSource*>(source->ptr);
-  auto [result_length, done] = frame_source->SelectPayloadLength(length);
+  auto [result_length, done] = source.SelectPayloadLength(length);
   if (result_length == 0 && !done) {
     return NGHTTP2_ERR_DEFERRED;
   } else if (result_length == DataFrameSource::kError) {
@@ -29,34 +40,12 @@
   if (done) {
     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
   }
-  if (!frame_source->send_fin()) {
+  if (!source.send_fin()) {
     *data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
   }
   return result_length;
 }
 
-int DataFrameSourceSendCallback(nghttp2_session* /* session */,
-                                nghttp2_frame* /* frame */,
-                                const uint8_t* framehd, size_t length,
-                                nghttp2_data_source* source,
-                                void* /* user_data */) {
-  auto* frame_source = static_cast<DataFrameSource*>(source->ptr);
-  frame_source->Send(ToStringView(framehd, kFrameHeaderSize), length);
-  return 0;
-}
-
 }  // namespace callbacks
-
-std::unique_ptr<nghttp2_data_provider> MakeDataProvider(
-    DataFrameSource* source) {
-  if (source == nullptr) {
-    return nullptr;
-  }
-  auto provider = std::make_unique<nghttp2_data_provider>();
-  provider->source.ptr = source;
-  provider->read_callback = &callbacks::DataFrameSourceReadCallback;
-  return provider;
-}
-
 }  // namespace adapter
 }  // namespace http2
diff --git a/quiche/http2/adapter/nghttp2_data_provider.h b/quiche/http2/adapter/nghttp2_data_provider.h
index a3f0957..a0c9cd9 100644
--- a/quiche/http2/adapter/nghttp2_data_provider.h
+++ b/quiche/http2/adapter/nghttp2_data_provider.h
@@ -5,32 +5,24 @@
 #include <memory>
 
 #include "quiche/http2/adapter/data_source.h"
+#include "quiche/http2/adapter/http2_visitor_interface.h"
 #include "quiche/http2/adapter/nghttp2.h"
 
 namespace http2 {
 namespace adapter {
 namespace callbacks {
 
-// Assumes |source| is a DataFrameSource.
-ssize_t DataFrameSourceReadCallback(nghttp2_session* /*session */,
-                                    int32_t /* stream_id */, uint8_t* /* buf */,
-                                    size_t length, uint32_t* data_flags,
-                                    nghttp2_data_source* source,
-                                    void* /* user_data */);
+// A callback that returns DATA frame payload size and associated flags, given a
+// Http2VisitorInterface.
+ssize_t VisitorReadCallback(Http2VisitorInterface& visitor, int32_t stream_id,
+                            size_t max_length, uint32_t* data_flags);
 
-int DataFrameSourceSendCallback(nghttp2_session* /* session */,
-                                nghttp2_frame* /* frame */,
-                                const uint8_t* framehd, size_t length,
-                                nghttp2_data_source* source,
-                                void* /* user_data */);
+// A callback that returns DATA frame payload size and associated flags, given a
+// DataFrameSource.
+ssize_t DataFrameSourceReadCallback(DataFrameSource& source, size_t length,
+                                    uint32_t* data_flags);
 
 }  // namespace callbacks
-
-// Transforms a DataFrameSource into a nghttp2_data_provider. Does not take
-// ownership of |source|. Returns nullptr if |source| is nullptr.
-std::unique_ptr<nghttp2_data_provider> MakeDataProvider(
-    DataFrameSource* source);
-
 }  // namespace adapter
 }  // namespace http2
 
diff --git a/quiche/http2/adapter/nghttp2_data_provider_test.cc b/quiche/http2/adapter/nghttp2_data_provider_test.cc
index caaaf51..6ebc8b9 100644
--- a/quiche/http2/adapter/nghttp2_data_provider_test.cc
+++ b/quiche/http2/adapter/nghttp2_data_provider_test.cc
@@ -1,5 +1,6 @@
 #include "quiche/http2/adapter/nghttp2_data_provider.h"
 
+#include "quiche/http2/adapter/nghttp2_util.h"
 #include "quiche/http2/adapter/test_utils.h"
 #include "quiche/common/platform/api/quiche_test.h"
 
@@ -9,105 +10,175 @@
 
 const size_t kFrameHeaderSize = 9;
 
-// Verifies that a nghttp2_data_provider derived from a DataFrameSource works
-// correctly with nghttp2-style callbacks when the amount of data read is less
-// than what the source provides.
-TEST(DataProviderTest, ReadLessThanSourceProvides) {
+// Verifies that the DataFrameSource read callback works correctly when the
+// amount of data read is less than what the source provides.
+TEST(DataFrameSourceTest, ReadLessThanSourceProvides) {
   const int32_t kStreamId = 1;
   TestVisitor visitor;
   visitor.AppendPayloadForStream(kStreamId, "Example payload");
   visitor.SetEndData(kStreamId, true);
   VisitorDataSource source(visitor, kStreamId);
-  auto provider = MakeDataProvider(&source);
   uint32_t data_flags = 0;
   const size_t kReadLength = 10;
   // Read callback selects a payload length given an upper bound.
   ssize_t result =
-      provider->read_callback(nullptr, kStreamId, nullptr, kReadLength,
-                              &data_flags, &provider->source, nullptr);
+      callbacks::DataFrameSourceReadCallback(source, kReadLength, &data_flags);
   ASSERT_EQ(kReadLength, result);
   EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_NO_END_STREAM,
             data_flags);
 
   const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
   // Sends the frame header and some payload bytes.
-  int send_result = callbacks::DataFrameSourceSendCallback(
-      nullptr, nullptr, framehd, result, &provider->source, nullptr);
-  EXPECT_EQ(0, send_result);
+  source.Send(ToStringView(framehd, kFrameHeaderSize), result);
   // Data accepted by the visitor includes a frame header and kReadLength bytes
   // of payload.
   EXPECT_EQ(visitor.data().size(), kFrameHeaderSize + kReadLength);
 }
 
-// Verifies that a nghttp2_data_provider derived from a DataFrameSource works
-// correctly with nghttp2-style callbacks when the amount of data read is more
-// than what the source provides.
-TEST(DataProviderTest, ReadMoreThanSourceProvides) {
+// Verifies that the Visitor read callback works correctly when the amount of
+// data read is less than what the source provides.
+TEST(VisitorTest, ReadLessThanSourceProvides) {
+  const int32_t kStreamId = 1;
+  TestVisitor visitor;
+  visitor.AppendPayloadForStream(kStreamId, "Example payload");
+  visitor.SetEndData(kStreamId, true);
+  uint32_t data_flags = 0;
+  const size_t kReadLength = 10;
+  // Read callback selects a payload length given an upper bound.
+  ssize_t result = callbacks::VisitorReadCallback(visitor, kStreamId,
+                                                  kReadLength, &data_flags);
+  ASSERT_EQ(kReadLength, result);
+  EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_NO_END_STREAM,
+            data_flags);
+
+  const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+  // Sends the frame header and some payload bytes.
+  visitor.SendDataFrame(kStreamId, ToStringView(framehd, kFrameHeaderSize),
+                        result);
+  // Data accepted by the visitor includes a frame header and kReadLength bytes
+  // of payload.
+  EXPECT_EQ(visitor.data().size(), kFrameHeaderSize + kReadLength);
+}
+
+// Verifies that the DataFrameSource read callback works correctly when the
+// amount of data read is more than what the source provides.
+TEST(DataFrameSourceTest, ReadMoreThanSourceProvides) {
   const int32_t kStreamId = 1;
   const absl::string_view kPayload = "Example payload";
   TestVisitor visitor;
   visitor.AppendPayloadForStream(kStreamId, kPayload);
   visitor.SetEndData(kStreamId, true);
   VisitorDataSource source(visitor, kStreamId);
-  auto provider = MakeDataProvider(&source);
   uint32_t data_flags = 0;
   const size_t kReadLength = 30;
   // Read callback selects a payload length given an upper bound.
   ssize_t result =
-      provider->read_callback(nullptr, kStreamId, nullptr, kReadLength,
-                              &data_flags, &provider->source, nullptr);
+      callbacks::DataFrameSourceReadCallback(source, kReadLength, &data_flags);
   ASSERT_EQ(kPayload.size(), result);
   EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_EOF, data_flags);
 
   const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
   // Sends the frame header and some payload bytes.
-  int send_result = callbacks::DataFrameSourceSendCallback(
-      nullptr, nullptr, framehd, result, &provider->source, nullptr);
-  EXPECT_EQ(0, send_result);
+  source.Send(ToStringView(framehd, kFrameHeaderSize), result);
   // Data accepted by the visitor includes a frame header and the entire
   // payload.
   EXPECT_EQ(visitor.data().size(), kFrameHeaderSize + kPayload.size());
 }
 
-// Verifies that a nghttp2_data_provider derived from a DataFrameSource works
-// correctly with nghttp2-style callbacks when the source is blocked.
-TEST(DataProviderTest, ReadFromBlockedSource) {
+// Verifies that the Visitor read callback works correctly when the amount of
+// data read is more than what the source provides.
+TEST(VisitorTest, ReadMoreThanSourceProvides) {
+  const int32_t kStreamId = 1;
+  const absl::string_view kPayload = "Example payload";
+  TestVisitor visitor;
+  visitor.AppendPayloadForStream(kStreamId, kPayload);
+  visitor.SetEndData(kStreamId, true);
+  VisitorDataSource source(visitor, kStreamId);
+  uint32_t data_flags = 0;
+  const size_t kReadLength = 30;
+  // Read callback selects a payload length given an upper bound.
+  ssize_t result = callbacks::VisitorReadCallback(visitor, kStreamId,
+                                                  kReadLength, &data_flags);
+  ASSERT_EQ(kPayload.size(), result);
+  EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_EOF, data_flags);
+
+  const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+  // Sends the frame header and some payload bytes.
+  visitor.SendDataFrame(kStreamId, ToStringView(framehd, kFrameHeaderSize),
+                        result);
+  // Data accepted by the visitor includes a frame header and the entire
+  // payload.
+  EXPECT_EQ(visitor.data().size(), kFrameHeaderSize + kPayload.size());
+}
+
+// Verifies that the DataFrameSource read callback works correctly when the
+// source is blocked.
+TEST(DataFrameSourceTest, ReadFromBlockedSource) {
   const int32_t kStreamId = 1;
   TestVisitor visitor;
   // Source has no payload, but also no fin, so it's blocked.
   VisitorDataSource source(visitor, kStreamId);
-  auto provider = MakeDataProvider(&source);
   uint32_t data_flags = 0;
   const size_t kReadLength = 10;
   ssize_t result =
-      provider->read_callback(nullptr, kStreamId, nullptr, kReadLength,
-                              &data_flags, &provider->source, nullptr);
+      callbacks::DataFrameSourceReadCallback(source, kReadLength, &data_flags);
   // Read operation is deferred, since the source is blocked.
   EXPECT_EQ(NGHTTP2_ERR_DEFERRED, result);
 }
 
-// Verifies that a nghttp2_data_provider derived from a DataFrameSource works
-// correctly with nghttp2-style callbacks when the source provides only fin and
-// no data.
-TEST(DataProviderTest, ReadFromZeroLengthSource) {
+// Verifies that the Visitor read callback works correctly when the source is
+// blocked.
+TEST(VisitorTest, ReadFromBlockedSource) {
+  const int32_t kStreamId = 1;
+  TestVisitor visitor;
+  // Stream has no payload, but also no fin, so it's blocked.
+  uint32_t data_flags = 0;
+  const size_t kReadLength = 10;
+  ssize_t result = callbacks::VisitorReadCallback(visitor, kStreamId,
+                                                  kReadLength, &data_flags);
+  // Read operation is deferred, since the source is blocked.
+  EXPECT_EQ(NGHTTP2_ERR_DEFERRED, result);
+}
+
+// Verifies that the DataFrameSource read callback works correctly when the
+// source provides only fin and no data.
+TEST(DataFrameSourceTest, ReadFromZeroLengthSource) {
   const int32_t kStreamId = 1;
   TestVisitor visitor;
   visitor.SetEndData(kStreamId, true);
   // Empty payload and fin=true indicates the source is done.
   VisitorDataSource source(visitor, kStreamId);
-  auto provider = MakeDataProvider(&source);
   uint32_t data_flags = 0;
   const size_t kReadLength = 10;
   ssize_t result =
-      provider->read_callback(nullptr, kStreamId, nullptr, kReadLength,
-                              &data_flags, &provider->source, nullptr);
+      callbacks::DataFrameSourceReadCallback(source, kReadLength, &data_flags);
   ASSERT_EQ(0, result);
   EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_EOF, data_flags);
 
   const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
-  int send_result = callbacks::DataFrameSourceSendCallback(
-      nullptr, nullptr, framehd, result, &provider->source, nullptr);
-  EXPECT_EQ(0, send_result);
+  source.Send(ToStringView(framehd, kFrameHeaderSize), result);
+  // Data accepted by the visitor includes a frame header with fin and zero
+  // bytes of payload.
+  EXPECT_EQ(visitor.data().size(), kFrameHeaderSize);
+}
+
+// Verifies that the Visitor read callback works correctly when the source
+// provides only fin and no data.
+TEST(VisitorTest, ReadFromZeroLengthSource) {
+  const int32_t kStreamId = 1;
+  TestVisitor visitor;
+  // Empty payload and fin=true indicates the source is done.
+  visitor.SetEndData(kStreamId, true);
+  uint32_t data_flags = 0;
+  const size_t kReadLength = 10;
+  ssize_t result = callbacks::VisitorReadCallback(visitor, kStreamId,
+                                                  kReadLength, &data_flags);
+  ASSERT_EQ(0, result);
+  EXPECT_EQ(NGHTTP2_DATA_FLAG_NO_COPY | NGHTTP2_DATA_FLAG_EOF, data_flags);
+
+  const uint8_t framehd[kFrameHeaderSize] = {1, 2, 3, 4, 5, 6, 7, 8, 9};
+  visitor.SendDataFrame(kStreamId, ToStringView(framehd, kFrameHeaderSize),
+                        result);
   // Data accepted by the visitor includes a frame header with fin and zero
   // bytes of payload.
   EXPECT_EQ(visitor.data().size(), kFrameHeaderSize);
diff --git a/quiche/http2/adapter/nghttp2_session_test.cc b/quiche/http2/adapter/nghttp2_session_test.cc
index ab3f337..164971b 100644
--- a/quiche/http2/adapter/nghttp2_session_test.cc
+++ b/quiche/http2/adapter/nghttp2_session_test.cc
@@ -37,7 +37,7 @@
   void TearDown() override { nghttp2_option_del(options_); }
 
   nghttp2_session_callbacks_unique_ptr CreateCallbacks() {
-    nghttp2_session_callbacks_unique_ptr callbacks = callbacks::Create();
+    nghttp2_session_callbacks_unique_ptr callbacks = callbacks::Create(nullptr);
     return callbacks;
   }
 
diff --git a/quiche/http2/adapter/oghttp2_adapter.cc b/quiche/http2/adapter/oghttp2_adapter.cc
index e232307..0a827bd 100644
--- a/quiche/http2/adapter/oghttp2_adapter.cc
+++ b/quiche/http2/adapter/oghttp2_adapter.cc
@@ -134,7 +134,6 @@
     absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source, bool end_stream,
     void* user_data) {
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
   return session_->SubmitRequest(headers, std::move(data_source), end_stream,
                                  user_data);
 }
@@ -143,7 +142,6 @@
                                    absl::Span<const Header> headers,
                                    std::unique_ptr<DataFrameSource> data_source,
                                    bool end_stream) {
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
   return session_->SubmitResponse(stream_id, headers, std::move(data_source),
                                   end_stream);
 }
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc
index 41010cb..14fba31 100644
--- a/quiche/http2/adapter/oghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -1402,9 +1402,12 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::SETTINGS}));
 }
 
-using OgHttp2AdapterDataTest = quiche::test::QuicheTest;
+class OgHttp2AdapterDataTest : public quiche::test::QuicheTestWithParam<bool> {
+};
 
-TEST_F(OgHttp2AdapterDataTest, ClientSendsTrailers) {
+INSTANTIATE_TEST_SUITE_P(BothValues, OgHttp2AdapterDataTest, testing::Bool());
+
+TEST_P(OgHttp2AdapterDataTest, ClientSendsTrailers) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -1423,8 +1426,8 @@
   visitor.SetEndData(1, false);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  const int32_t stream_id1 =
-      adapter->SubmitRequest(headers1, std::move(body1), false, nullptr);
+  const int32_t stream_id1 = adapter->SubmitRequest(
+      headers1, GetParam() ? nullptr : std::move(body1), false, nullptr);
   ASSERT_EQ(stream_id1, 1);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -2628,7 +2631,7 @@
                                             SpdyFrameType::RST_STREAM}));
 }
 
-TEST_F(OgHttp2AdapterDataTest, ClientObeysMaxConcurrentStreams) {
+TEST_P(OgHttp2AdapterDataTest, ClientObeysMaxConcurrentStreams) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -2681,12 +2684,12 @@
   visitor.AppendPayloadForStream(1, kBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   ASSERT_EQ(stream_id, 1);
   EXPECT_TRUE(adapter->want_write());
 
@@ -2760,7 +2763,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(OgHttp2AdapterDataTest, ClientReceivesInitialWindowSetting) {
+TEST_P(OgHttp2AdapterDataTest, ClientReceivesInitialWindowSetting) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -2804,12 +2807,12 @@
   visitor.AppendPayloadForStream(1, kLongBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -2827,7 +2830,7 @@
                             SpdyFrameType::DATA, SpdyFrameType::DATA}));
 }
 
-TEST_F(OgHttp2AdapterDataTest,
+TEST_P(OgHttp2AdapterDataTest,
        ClientReceivesInitialWindowSettingAfterStreamStart) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
@@ -2862,12 +2865,12 @@
   visitor.AppendPayloadForStream(1, kLongBody);
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
-  const int stream_id =
-      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
-                                        {":scheme", "http"},
-                                        {":authority", "example.com"},
-                                        {":path", "/this/is/request/one"}}),
-                             std::move(body1), false, nullptr);
+  const int stream_id = adapter->SubmitRequest(
+      ToHeaders({{":method", "POST"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}}),
+      GetParam() ? nullptr : std::move(body1), false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -3451,7 +3454,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(OgHttp2AdapterDataTest, ClientEncountersFlowControlBlock) {
+TEST_P(OgHttp2AdapterDataTest, ClientEncountersFlowControlBlock) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -3470,8 +3473,8 @@
   visitor.SetEndData(1, false);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  const int32_t stream_id1 =
-      adapter->SubmitRequest(headers1, std::move(body1), false, nullptr);
+  const int32_t stream_id1 = adapter->SubmitRequest(
+      headers1, GetParam() ? nullptr : std::move(body1), false, nullptr);
   ASSERT_GT(stream_id1, 0);
 
   const std::vector<Header> headers2 =
@@ -3484,8 +3487,8 @@
   visitor.SetEndData(3, false);
   auto body2 = std::make_unique<VisitorDataSource>(visitor, 3);
 
-  const int32_t stream_id2 =
-      adapter->SubmitRequest(headers2, std::move(body2), false, nullptr);
+  const int32_t stream_id2 = adapter->SubmitRequest(
+      headers2, GetParam() ? nullptr : std::move(body2), false, nullptr);
   ASSERT_EQ(stream_id2, 3);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -3534,7 +3537,7 @@
   EXPECT_EQ(0, result);
 }
 
-TEST_F(OgHttp2AdapterDataTest, ClientSendsTrailersAfterFlowControlBlock) {
+TEST_P(OgHttp2AdapterDataTest, ClientSendsTrailersAfterFlowControlBlock) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -3552,8 +3555,8 @@
   visitor.SetEndData(1, false);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  const int32_t stream_id1 =
-      adapter->SubmitRequest(headers1, std::move(body1), false, nullptr);
+  const int32_t stream_id1 = adapter->SubmitRequest(
+      headers1, GetParam() ? nullptr : std::move(body1), false, nullptr);
   ASSERT_GT(stream_id1, 0);
 
   const std::vector<Header> headers2 =
@@ -3567,8 +3570,8 @@
   visitor.SetEndData(3, false);
   auto body2 = std::make_unique<VisitorDataSource>(visitor, 3);
 
-  const int32_t stream_id2 =
-      adapter->SubmitRequest(headers2, std::move(body2), false, nullptr);
+  const int32_t stream_id2 = adapter->SubmitRequest(
+      headers2, GetParam() ? nullptr : std::move(body2), false, nullptr);
   ASSERT_GT(stream_id2, 0);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -4195,7 +4198,7 @@
   EXPECT_LT(send_result, 0);
 }
 
-TEST_F(OgHttp2AdapterDataTest, ConnectionErrorOnDataFrameSent) {
+TEST_P(OgHttp2AdapterDataTest, ConnectionErrorOnDataFrameSent) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -4230,8 +4233,9 @@
   visitor.AppendPayloadForStream(
       1, "Here is some data, which will lead to a fatal error");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_TRUE(adapter->want_write());
@@ -4299,7 +4303,7 @@
   EXPECT_EQ(frames.size(), static_cast<size_t>(result));
 }
 
-TEST_F(OgHttp2AdapterDataTest, RepeatedHeaderNames) {
+TEST_P(OgHttp2AdapterDataTest, RepeatedHeaderNames) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -4345,8 +4349,8 @@
   visitor.SetEndData(1, true);
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result =
-      adapter->SubmitResponse(1, headers1, std::move(body1), false);
+  int submit_result = adapter->SubmitResponse(
+      1, headers1, GetParam() ? nullptr : std::move(body1), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -4365,7 +4369,7 @@
                             SpdyFrameType::HEADERS, SpdyFrameType::DATA}));
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerRespondsToRequestWithTrailers) {
+TEST_P(OgHttp2AdapterDataTest, ServerRespondsToRequestWithTrailers) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -4406,8 +4410,8 @@
   const std::vector<Header> headers1 = ToHeaders({{":status", "200"}});
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result =
-      adapter->SubmitResponse(1, headers1, std::move(body1), false);
+  int submit_result = adapter->SubmitResponse(
+      1, headers1, GetParam() ? nullptr : std::move(body1), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -4568,7 +4572,7 @@
                             SpdyFrameType::RST_STREAM}));
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitsResponseWithDataSourceError) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitsResponseWithDataSourceError) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -4607,7 +4611,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -4865,7 +4869,7 @@
   EXPECT_EQ(frames.size(), static_cast<size_t>(result));
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerSendsInvalidTrailers) {
+TEST_P(OgHttp2AdapterDataTest, ServerSendsInvalidTrailers) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -4909,7 +4913,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -5160,7 +5164,7 @@
 
 // Tests the case where the response body is in the progress of being sent while
 // trailers are queued.
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitsTrailersWhileDataDeferred) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitsTrailersWhileDataDeferred) {
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
   for (const bool add_more_body_data : {true, false}) {
@@ -5220,7 +5224,7 @@
     auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
     int submit_result = adapter->SubmitResponse(
         1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-        std::move(body1), false);
+        GetParam() ? nullptr : std::move(body1), false);
     EXPECT_EQ(submit_result, 0);
     EXPECT_TRUE(adapter->want_write());
 
@@ -5258,7 +5262,7 @@
 // Tests the case where the response body and trailers become blocked by flow
 // control while the stream is writing. Regression test for
 // https://github.com/envoyproxy/envoy/issues/31710
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitsTrailersWithFlowControlBlockage) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitsTrailersWithFlowControlBlockage) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -5312,7 +5316,7 @@
   auto body1 = std::make_unique<VisitorDataSource>(visitor, 1);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -5374,7 +5378,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitsTrailersWithDataEndStream) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitsTrailersWithDataEndStream) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -5417,8 +5421,9 @@
   visitor.SetEndData(1, true);
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(submit_result, 0);
 
   const std::vector<Header> trailers =
@@ -5444,7 +5449,7 @@
                             SpdyFrameType::HEADERS, SpdyFrameType::DATA}));
 }
 
-TEST_F(OgHttp2AdapterDataTest,
+TEST_P(OgHttp2AdapterDataTest,
        ServerSubmitsTrailersWithDataEndStreamAndDeferral) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
@@ -5488,8 +5493,9 @@
   visitor.AppendPayloadForStream(1, kBody);
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(submit_result, 0);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
@@ -6407,7 +6413,7 @@
                             SpdyFrameType::HEADERS}));
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitResponse) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitResponse) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -6476,7 +6482,7 @@
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -6507,7 +6513,7 @@
   EXPECT_GT(adapter->GetHpackEncoderDynamicTableSize(), 0);
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerSubmitResponseWithResetFromClient) {
+TEST_P(OgHttp2AdapterDataTest, ServerSubmitResponseWithResetFromClient) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -6563,7 +6569,7 @@
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      std::move(body1), false);
+      GetParam() ? nullptr : std::move(body1), false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->want_write());
 
@@ -6649,9 +6655,12 @@
 }
 
 using OgHttp2AdapterInteractionDataTest = OgHttp2AdapterDataTest;
+
+INSTANTIATE_TEST_SUITE_P(BothValues, OgHttp2AdapterInteractionDataTest,
+                         testing::Bool());
 // Exercises a naive mutually recursive test client and server. This test fails
 // without recursion guards in OgHttp2Session.
-TEST_F(OgHttp2AdapterInteractionDataTest, ClientServerInteractionTest) {
+TEST_P(OgHttp2AdapterInteractionDataTest, ClientServerInteractionTest) {
   TestVisitor client_visitor;
   OgHttp2Adapter::Options client_options;
   client_options.perspective = Perspective::kClient;
@@ -6699,7 +6708,7 @@
                          {":authority", "example.com"},
                          {":path",
                           absl::StrCat("/this/is/request/", new_stream_id)}}),
-              std::move(body), false, nullptr);
+              GetParam() ? nullptr : std::move(body), false, nullptr);
           EXPECT_EQ(new_stream_id, created_stream_id);
           client_adapter->Send();
         }
@@ -7359,7 +7368,7 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
-TEST_F(OgHttp2AdapterDataTest, SkipsSendingFramesForRejectedStream) {
+TEST_P(OgHttp2AdapterDataTest, SkipsSendingFramesForRejectedStream) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -7396,8 +7405,9 @@
       1, "Here is some data, which will be completely ignored!");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
 
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   auto source = std::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
@@ -7542,7 +7552,7 @@
   EXPECT_LT(next_result, 0);
 }
 
-TEST_F(OgHttp2AdapterDataTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
+TEST_P(OgHttp2AdapterDataTest, ServerDoesNotSendFramesAfterImmediateGoAway) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -7580,8 +7590,9 @@
   // Submit a response for the stream.
   visitor.AppendPayloadForStream(1, "This data is doomed to never be written.");
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   // Submit a WINDOW_UPDATE frame.
@@ -8413,7 +8424,7 @@
   EXPECT_EQ(frames.size(), static_cast<size_t>(result));
 }
 
-TEST_F(OgHttp2AdapterDataTest, NegativeFlowControlStreamResumption) {
+TEST_P(OgHttp2AdapterDataTest, NegativeFlowControlStreamResumption) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kServer;
@@ -8457,8 +8468,9 @@
   // Submit a response for the stream.
   visitor.AppendPayloadForStream(1, std::string(70000, 'a'));
   auto body = std::make_unique<VisitorDataSource>(visitor, 1);
-  int submit_result = adapter->SubmitResponse(
-      1, ToHeaders({{":status", "200"}}), std::move(body), false);
+  int submit_result =
+      adapter->SubmitResponse(1, ToHeaders({{":status", "200"}}),
+                              GetParam() ? nullptr : std::move(body), false);
   ASSERT_EQ(0, submit_result);
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index f47385c..cbc13d3 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -24,6 +24,7 @@
 namespace {
 
 using ConnectionError = Http2VisitorInterface::ConnectionError;
+using DataFrameHeaderInfo = Http2VisitorInterface::DataFrameHeaderInfo;
 using SpdyFramerError = Http2DecoderAdapter::SpdyFramerError;
 
 using ::spdy::SpdySettingsIR;
@@ -683,11 +684,13 @@
     QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
   }
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
-  if (!end_stream) {
+  if (data_source != nullptr) {
     // Add data source to stream state
     iter->second.outbound_body = std::move(data_source);
     write_scheduler_.MarkStreamReady(stream_id, false);
+  } else if (!end_stream) {
+    iter->second.check_visitor_for_body = true;
+    write_scheduler_.MarkStreamReady(stream_id, false);
   }
   SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
   return 0;
@@ -1023,8 +1026,6 @@
     // Enqueue trailers immediately.
     SendTrailers(stream_id, ToHeaderBlock(trailers));
   } else {
-    QUICHE_LOG_IF(ERROR, state.outbound_body->send_fin())
-        << "DataFrameSource will send fin, preventing trailers!";
     // Save trailers so they can be written once data is done.
     state.trailers =
         std::make_unique<spdy::Http2HeaderBlock>(ToHeaderBlock(trailers));
@@ -1825,10 +1826,12 @@
   }
 
   auto iter = CreateStream(stream_id);
-  QUICHE_DCHECK_EQ(end_stream, data_source == nullptr);
-  if (!end_stream) {
+  if (data_source != nullptr) {
     iter->second.outbound_body = std::move(data_source);
     write_scheduler_.MarkStreamReady(stream_id, false);
+  } else if (!end_stream) {
+    iter->second.check_visitor_for_body = true;
+    write_scheduler_.MarkStreamReady(stream_id, false);
   }
   iter->second.user_data = user_data;
   for (const auto& [name, value] : headers) {
@@ -2060,33 +2063,51 @@
 }
 
 bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const {
-  return stream_state.outbound_body != nullptr;
+  return stream_state.outbound_body != nullptr ||
+         stream_state.check_visitor_for_body;
 }
 
 bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const {
-  return stream_state.outbound_body != nullptr && !stream_state.data_deferred;
+  return HasMoreData(stream_state) && !stream_state.data_deferred;
 }
 
 void OgHttp2Session::AbandonData(StreamState& stream_state) {
   stream_state.outbound_body = nullptr;
+  stream_state.check_visitor_for_body = false;
 }
 
 OgHttp2Session::DataFrameInfo OgHttp2Session::GetDataFrameInfo(
-    Http2StreamId /*stream_id*/, size_t flow_control_available,
+    Http2StreamId stream_id, size_t flow_control_available,
     StreamState& stream_state) {
-  DataFrameInfo info;
-  std::tie(info.payload_length, info.end_data) =
-      stream_state.outbound_body->SelectPayloadLength(flow_control_available);
-  info.send_fin =
-      info.end_data ? stream_state.outbound_body->send_fin() : false;
+  DataFrameInfo info{.payload_length = 0, .end_data = true, .send_fin = true};
+  if (stream_state.outbound_body != nullptr) {
+    std::tie(info.payload_length, info.end_data) =
+        stream_state.outbound_body->SelectPayloadLength(flow_control_available);
+    info.send_fin =
+        info.end_data ? stream_state.outbound_body->send_fin() : false;
+  } else if (stream_state.check_visitor_for_body) {
+    DataFrameHeaderInfo visitor_info =
+        visitor_.OnReadyToSendDataForStream(stream_id, flow_control_available);
+    info.payload_length = visitor_info.payload_length;
+    info.end_data = visitor_info.end_data || visitor_info.end_stream;
+    info.send_fin = visitor_info.end_stream;
+  } else {
+    QUICHE_LOG(DFATAL) << "GetDataFrameInfo for stream " << stream_id
+                       << " but no body available!";
+  }
   return info;
 }
 
-bool OgHttp2Session::SendDataFrame(Http2StreamId /*stream_id*/,
+bool OgHttp2Session::SendDataFrame(Http2StreamId stream_id,
                                    absl::string_view frame_header,
                                    size_t payload_length,
                                    StreamState& stream_state) {
-  return stream_state.outbound_body->Send(frame_header, payload_length);
+  if (stream_state.outbound_body != nullptr) {
+    return stream_state.outbound_body->Send(frame_header, payload_length);
+  } else {
+    QUICHE_DCHECK(stream_state.check_visitor_for_body);
+    return visitor_.SendDataFrame(stream_id, frame_header, payload_length);
+  }
 }
 
 }  // namespace adapter
diff --git a/quiche/http2/adapter/oghttp2_session.h b/quiche/http2/adapter/oghttp2_session.h
index 3f37019..8d6756f 100644
--- a/quiche/http2/adapter/oghttp2_session.h
+++ b/quiche/http2/adapter/oghttp2_session.h
@@ -249,6 +249,7 @@
     int32_t send_window;
     std::optional<HeaderType> received_header_type;
     std::optional<size_t> remaining_content_length;
+    bool check_visitor_for_body = false;
     bool half_closed_local = false;
     bool half_closed_remote = false;
     // Indicates that `outbound_body` temporarily cannot produce data.
diff --git a/quiche/http2/adapter/recording_http2_visitor.cc b/quiche/http2/adapter/recording_http2_visitor.cc
index d55045f..0b989f0 100644
--- a/quiche/http2/adapter/recording_http2_visitor.cc
+++ b/quiche/http2/adapter/recording_http2_visitor.cc
@@ -13,6 +13,22 @@
   return serialized.size();
 }
 
+Http2VisitorInterface::DataFrameHeaderInfo
+RecordingHttp2Visitor::OnReadyToSendDataForStream(Http2StreamId stream_id,
+                                                  size_t max_length) {
+  events_.push_back(absl::StrFormat("OnReadyToSendDataForStream %d %d",
+                                    stream_id, max_length));
+  return {70000, true, true};
+}
+
+bool RecordingHttp2Visitor::SendDataFrame(Http2StreamId stream_id,
+                                          absl::string_view /*frame_header*/,
+                                          size_t payload_bytes) {
+  events_.push_back(
+      absl::StrFormat("SendDataFrame %d %d", stream_id, payload_bytes));
+  return true;
+}
+
 void RecordingHttp2Visitor::OnConnectionError(ConnectionError error) {
   events_.push_back(
       absl::StrFormat("OnConnectionError %s", ConnectionErrorToString(error)));
diff --git a/quiche/http2/adapter/recording_http2_visitor.h b/quiche/http2/adapter/recording_http2_visitor.h
index d796fe7..b7835ee 100644
--- a/quiche/http2/adapter/recording_http2_visitor.h
+++ b/quiche/http2/adapter/recording_http2_visitor.h
@@ -21,6 +21,10 @@
 
   // From Http2VisitorInterface
   int64_t OnReadyToSend(absl::string_view serialized) override;
+  DataFrameHeaderInfo OnReadyToSendDataForStream(Http2StreamId stream_id,
+                                                 size_t max_length) override;
+  bool SendDataFrame(Http2StreamId stream_id, absl::string_view frame_header,
+                     size_t payload_bytes) override;
   void OnConnectionError(ConnectionError error) override;
   bool OnFrameHeader(Http2StreamId stream_id, size_t length, uint8_t type,
                      uint8_t flags) override;
diff --git a/quiche/http2/adapter/test_utils.cc b/quiche/http2/adapter/test_utils.cc
index a72bc1d..1e6765c 100644
--- a/quiche/http2/adapter/test_utils.cc
+++ b/quiche/http2/adapter/test_utils.cc
@@ -94,7 +94,7 @@
   payload.return_error = true;
 }
 
-VisitorDataSource::VisitorDataSource(TestVisitor& visitor,
+VisitorDataSource::VisitorDataSource(Http2VisitorInterface& visitor,
                                      Http2StreamId stream_id)
     : visitor_(visitor), stream_id_(stream_id) {}
 
diff --git a/quiche/http2/adapter/test_utils.h b/quiche/http2/adapter/test_utils.h
index 047da91..5c7a687 100644
--- a/quiche/http2/adapter/test_utils.h
+++ b/quiche/http2/adapter/test_utils.h
@@ -59,16 +59,10 @@
     }
   }
 
-  struct DataFrameHeaderInfo {
-    int64_t payload_length;
-    bool end_data;
-    bool end_stream;
-  };
-  // These methods will be moved to Http2VisitorInterface in a future CL.
   DataFrameHeaderInfo OnReadyToSendDataForStream(Http2StreamId stream_id,
-                                                 size_t max_length);
+                                                 size_t max_length) override;
   bool SendDataFrame(Http2StreamId stream_id, absl::string_view frame_header,
-                     size_t payload_bytes);
+                     size_t payload_bytes) override;
 
   // Test methods to manipulate the data frame payload to send for a stream.
   void AppendPayloadForStream(Http2StreamId stream_id,
@@ -104,15 +98,14 @@
 // A DataFrameSource that invokes visitor methods.
 class QUICHE_NO_EXPORT VisitorDataSource : public DataFrameSource {
  public:
-  // TODO(birenroy): revert visitor type to the interface type.
-  VisitorDataSource(TestVisitor& visitor, Http2StreamId stream_id);
+  VisitorDataSource(Http2VisitorInterface& visitor, Http2StreamId stream_id);
 
   std::pair<int64_t, bool> SelectPayloadLength(size_t max_length) override;
   bool Send(absl::string_view frame_header, size_t payload_length) override;
   bool send_fin() const override;
 
  private:
-  TestVisitor& visitor_;
+  Http2VisitorInterface& visitor_;
   const Http2StreamId stream_id_;
   // Whether the stream should end with the final frame of data.
   bool has_fin_ = false;