Removes DataFrameSource-based DATA frame manipulation from OgHttp2Session.

The `DataFrameSource` parts of the API are no longer used.

PiperOrigin-RevId: 699991798
diff --git a/quiche/http2/adapter/oghttp2_adapter.cc b/quiche/http2/adapter/oghttp2_adapter.cc
index 0813452..9fb7e8c 100644
--- a/quiche/http2/adapter/oghttp2_adapter.cc
+++ b/quiche/http2/adapter/oghttp2_adapter.cc
@@ -143,16 +143,16 @@
     absl::Span<const Header> headers,
     std::unique_ptr<DataFrameSource> data_source, bool end_stream,
     void* user_data) {
-  return session_->SubmitRequest(headers, std::move(data_source), end_stream,
-                                 user_data);
+  QUICHE_DCHECK(data_source == nullptr);
+  return session_->SubmitRequest(headers, end_stream, user_data);
 }
 
 int OgHttp2Adapter::SubmitResponse(Http2StreamId stream_id,
                                    absl::Span<const Header> headers,
                                    std::unique_ptr<DataFrameSource> data_source,
                                    bool end_stream) {
-  return session_->SubmitResponse(stream_id, headers, std::move(data_source),
-                                  end_stream);
+  QUICHE_DCHECK(data_source == nullptr);
+  return session_->SubmitResponse(stream_id, headers, end_stream);
 }
 
 int OgHttp2Adapter::SubmitTrailer(Http2StreamId stream_id,
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc
index 1e9b619..932e083 100644
--- a/quiche/http2/adapter/oghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -3809,7 +3809,7 @@
 
 // This test verifies how oghttp2 behaves when a connection becomes
 // write-blocked while sending HEADERS.
-TEST(OgHttp2AdapterTest, ClientSubmitRequestWithDataProviderAndWriteBlock) {
+TEST(OgHttp2AdapterTest, ClientSubmitRequestWithWriteBlock) {
   TestVisitor visitor;
   OgHttp2Adapter::Options options;
   options.perspective = Perspective::kClient;
@@ -3829,8 +3829,6 @@
 
   const absl::string_view kBody = "This is an example request body.";
 
-  std::unique_ptr<DataFrameSource> frame_source =
-      std::make_unique<VisitorDataSource>(visitor, 1);
   visitor.AppendPayloadForStream(1, kBody);
   visitor.SetEndData(1, true);
   int stream_id =
@@ -3838,7 +3836,7 @@
                                         {":scheme", "http"},
                                         {":authority", "example.com"},
                                         {":path", "/this/is/request/one"}}),
-                             std::move(frame_source), false, nullptr);
+                             nullptr, false, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->want_write());
 
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index fe59df7..296fa2c 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -658,49 +658,6 @@
   return write_scheduler_.PopNextReadyStream();
 }
 
-int32_t OgHttp2Session::SubmitRequestInternal(
-    absl::Span<const Header> headers,
-    std::unique_ptr<DataFrameSource> data_source, bool end_stream,
-    void* user_data) {
-  // TODO(birenroy): return an error for the incorrect perspective
-  const Http2StreamId stream_id = next_stream_id_;
-  next_stream_id_ += 2;
-  if (!pending_streams_.empty() || !CanCreateStream()) {
-    // TODO(diannahu): There should probably be a limit to the number of allowed
-    // pending streams.
-    pending_streams_.insert(
-        {stream_id,
-         PendingStreamState{ToHeaderBlock(headers), std::move(data_source),
-                            user_data, end_stream}});
-    StartPendingStreams();
-  } else {
-    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
-                 user_data, end_stream);
-  }
-  return stream_id;
-}
-
-int OgHttp2Session::SubmitResponseInternal(
-    Http2StreamId stream_id, absl::Span<const Header> headers,
-    std::unique_ptr<DataFrameSource> data_source, bool end_stream) {
-  // TODO(birenroy): return an error for the incorrect perspective
-  auto iter = stream_map_.find(stream_id);
-  if (iter == stream_map_.end()) {
-    QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
-    return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
-  }
-  if (data_source != nullptr) {
-    // Add data source to stream state
-    iter->second.outbound_body = std::move(data_source);
-    write_scheduler_.MarkStreamReady(stream_id, false);
-  } else if (!end_stream) {
-    iter->second.check_visitor_for_body = true;
-    write_scheduler_.MarkStreamReady(stream_id, false);
-  }
-  SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
-  return 0;
-}
-
 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
   int64_t result = std::numeric_limits<int64_t>::max();
   while (result > 0 && !buffered_data_.Empty()) {
@@ -1018,20 +975,39 @@
   }
 }
 
-int32_t OgHttp2Session::SubmitRequest(
-    absl::Span<const Header> headers,
-    std::unique_ptr<DataFrameSource> data_source, bool end_stream,
-    void* user_data) {
-  return SubmitRequestInternal(headers, std::move(data_source), end_stream,
-                               user_data);
+int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
+                                      bool end_stream, void* user_data) {
+  // TODO(birenroy): return an error for the incorrect perspective
+  const Http2StreamId stream_id = next_stream_id_;
+  next_stream_id_ += 2;
+  if (!pending_streams_.empty() || !CanCreateStream()) {
+    // TODO(diannahu): There should probably be a limit to the number of allowed
+    // pending streams.
+    pending_streams_.insert(
+        {stream_id,
+         PendingStreamState{ToHeaderBlock(headers), user_data, end_stream}});
+    StartPendingStreams();
+  } else {
+    StartRequest(stream_id, ToHeaderBlock(headers), user_data, end_stream);
+  }
+  return stream_id;
 }
 
 int OgHttp2Session::SubmitResponse(Http2StreamId stream_id,
                                    absl::Span<const Header> headers,
-                                   std::unique_ptr<DataFrameSource> data_source,
                                    bool end_stream) {
-  return SubmitResponseInternal(stream_id, headers, std::move(data_source),
-                                end_stream);
+  // TODO(birenroy): return an error for the incorrect perspective
+  auto iter = stream_map_.find(stream_id);
+  if (iter == stream_map_.end()) {
+    QUICHE_LOG(ERROR) << "Unable to find stream " << stream_id;
+    return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
+  }
+  if (!end_stream) {
+    iter->second.check_visitor_for_body = true;
+    write_scheduler_.MarkStreamReady(stream_id, false);
+  }
+  SendHeaders(stream_id, ToHeaderBlock(headers), end_stream);
+  return 0;
 }
 
 int OgHttp2Session::SubmitTrailer(Http2StreamId stream_id,
@@ -1874,7 +1850,6 @@
 
 void OgHttp2Session::StartRequest(Http2StreamId stream_id,
                                   quiche::HttpHeaderBlock headers,
-                                  std::unique_ptr<DataFrameSource> data_source,
                                   void* user_data, bool end_stream) {
   if (received_goaway_) {
     // Do not start new streams after receiving a GOAWAY.
@@ -1883,10 +1858,7 @@
   }
 
   auto iter = CreateStream(stream_id);
-  if (data_source != nullptr) {
-    iter->second.outbound_body = std::move(data_source);
-    write_scheduler_.MarkStreamReady(stream_id, false);
-  } else if (!end_stream) {
+  if (!end_stream) {
     iter->second.check_visitor_for_body = true;
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
@@ -1903,7 +1875,6 @@
   while (!pending_streams_.empty() && CanCreateStream()) {
     auto& [stream_id, pending_stream] = pending_streams_.front();
     StartRequest(stream_id, std::move(pending_stream.headers),
-                 std::move(pending_stream.data_source),
                  pending_stream.user_data, pending_stream.end_stream);
     pending_streams_.pop_front();
   }
@@ -2121,8 +2092,7 @@
 }
 
 bool OgHttp2Session::HasMoreData(const StreamState& stream_state) const {
-  return stream_state.outbound_body != nullptr ||
-         stream_state.check_visitor_for_body;
+  return stream_state.check_visitor_for_body;
 }
 
 bool OgHttp2Session::IsReadyToWriteData(const StreamState& stream_state) const {
@@ -2130,21 +2100,13 @@
 }
 
 void OgHttp2Session::AbandonData(StreamState& stream_state) {
-  stream_state.outbound_body = nullptr;
   stream_state.check_visitor_for_body = false;
 }
 
 OgHttp2Session::DataFrameHeaderInfo OgHttp2Session::GetDataFrameInfo(
     Http2StreamId stream_id, size_t flow_control_available,
     StreamState& stream_state) {
-  if (stream_state.outbound_body != nullptr) {
-    DataFrameHeaderInfo info;
-    std::tie(info.payload_length, info.end_data) =
-        stream_state.outbound_body->SelectPayloadLength(flow_control_available);
-    info.end_stream =
-        info.end_data ? stream_state.outbound_body->send_fin() : false;
-    return info;
-  } else if (stream_state.check_visitor_for_body) {
+  if (stream_state.check_visitor_for_body) {
     DataFrameHeaderInfo info =
         visitor_.OnReadyToSendDataForStream(stream_id, flow_control_available);
     info.end_data = info.end_data || info.end_stream;
@@ -2159,12 +2121,8 @@
                                    absl::string_view frame_header,
                                    size_t payload_length,
                                    StreamState& stream_state) {
-  if (stream_state.outbound_body != nullptr) {
-    return stream_state.outbound_body->Send(frame_header, payload_length);
-  } else {
-    QUICHE_DCHECK(stream_state.check_visitor_for_body);
-    return visitor_.SendDataFrame(stream_id, frame_header, payload_length);
-  }
+  QUICHE_DCHECK(stream_state.check_visitor_for_body);
+  return visitor_.SendDataFrame(stream_id, frame_header, payload_length);
 }
 
 }  // namespace adapter
diff --git a/quiche/http2/adapter/oghttp2_session.h b/quiche/http2/adapter/oghttp2_session.h
index 4a4221c..73d9e26 100644
--- a/quiche/http2/adapter/oghttp2_session.h
+++ b/quiche/http2/adapter/oghttp2_session.h
@@ -108,10 +108,8 @@
   int Send();
 
   int32_t SubmitRequest(absl::Span<const Header> headers,
-                        std::unique_ptr<DataFrameSource> data_source,
                         bool end_stream, void* user_data);
   int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
-                     std::unique_ptr<DataFrameSource> data_source,
                      bool end_stream);
   int SubmitTrailer(Http2StreamId stream_id, absl::Span<const Header> trailers);
   void SubmitMetadata(Http2StreamId stream_id,
@@ -248,7 +246,6 @@
           send_window(stream_send_window) {}
 
     WindowManager window_manager;
-    std::unique_ptr<DataFrameSource> outbound_body;
     std::unique_ptr<quiche::HttpHeaderBlock> trailers;
     void* user_data = nullptr;
     int32_t send_window;
@@ -266,7 +263,6 @@
 
   struct QUICHE_EXPORT PendingStreamState {
     quiche::HttpHeaderBlock headers;
-    std::unique_ptr<DataFrameSource> data_source;
     void* user_data = nullptr;
     bool end_stream;
   };
@@ -374,14 +370,6 @@
   // streams, returns zero.
   Http2StreamId GetNextReadyStream();
 
-  int32_t SubmitRequestInternal(absl::Span<const Header> headers,
-                                std::unique_ptr<DataFrameSource> data_source,
-                                bool end_stream, void* user_data);
-  int SubmitResponseInternal(Http2StreamId stream_id,
-                             absl::Span<const Header> headers,
-                             std::unique_ptr<DataFrameSource> data_source,
-                             bool end_stream);
-
   // Sends the buffered connection preface or serialized frame data, if any.
   SendResult MaybeSendBufferedData();
 
@@ -419,7 +407,6 @@
   // Creates a stream for `stream_id`, stores the `data_source` and `user_data`
   // in the stream state, and sends the `headers`.
   void StartRequest(Http2StreamId stream_id, quiche::HttpHeaderBlock headers,
-                    std::unique_ptr<DataFrameSource> data_source,
                     void* user_data, bool end_stream);
 
   // Sends headers for pending streams as long as the stream limit allows.
diff --git a/quiche/http2/adapter/oghttp2_session_test.cc b/quiche/http2/adapter/oghttp2_session_test.cc
index 1fa44fd..89e3001 100644
--- a/quiche/http2/adapter/oghttp2_session_test.cc
+++ b/quiche/http2/adapter/oghttp2_session_test.cc
@@ -97,7 +97,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, const_cast<char*>(kSentinel1));
+                            false, const_cast<char*>(kSentinel1));
   ASSERT_EQ(stream_id, 1);
 
   // Submit another request to ensure the next stream is created.
@@ -106,7 +106,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/two"}}),
-                            nullptr, true, nullptr);
+                            true, nullptr);
   EXPECT_EQ(stream_id2, 3);
 
   const std::string stream_frames =
@@ -282,7 +282,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, const_cast<char*>(kSentinel1));
+                            false, const_cast<char*>(kSentinel1));
   ASSERT_EQ(stream_id, 1);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -313,7 +313,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/two"}}),
-                            nullptr, true, nullptr);
+                            true, nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   const char* kSentinel2 = "arbitrary pointer 2";
@@ -392,7 +392,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, nullptr);
+                            false, nullptr);
   ASSERT_EQ(stream_id, 1);
   EXPECT_TRUE(session.want_write());
 
@@ -424,7 +424,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, const_cast<char*>(kSentinel1));
+                            false, const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -478,7 +478,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, const_cast<char*>(kSentinel1));
+                            false, const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -533,7 +533,7 @@
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            nullptr, false, const_cast<char*>(kSentinel1));
+                            false, const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -804,7 +804,7 @@
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      nullptr, false);
+      false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
 
@@ -893,7 +893,7 @@
   visitor.SetEndData(1, false);
   int submit_result = session.SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      nullptr, false);
+      false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
 
@@ -984,7 +984,7 @@
   visitor.SetEndData(1, false);
   int submit_result = session.SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      nullptr, false);
+      false);
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
   // There has not been a call to Send() yet, so neither headers nor body have
diff --git a/quiche/http2/adapter/test_utils.cc b/quiche/http2/adapter/test_utils.cc
index 1787ddf..9b96fde 100644
--- a/quiche/http2/adapter/test_utils.cc
+++ b/quiche/http2/adapter/test_utils.cc
@@ -124,25 +124,6 @@
   outbound_metadata_map_.insert({stream_id, EncodeHeaders(payload)});
 }
 
-VisitorDataSource::VisitorDataSource(Http2VisitorInterface& visitor,
-                                     Http2StreamId stream_id)
-    : visitor_(visitor), stream_id_(stream_id) {}
-
-bool VisitorDataSource::send_fin() const { return has_fin_; }
-
-std::pair<int64_t, bool> VisitorDataSource::SelectPayloadLength(
-    size_t max_length) {
-  auto [payload_length, end_data, end_stream] =
-      visitor_.OnReadyToSendDataForStream(stream_id_, max_length);
-  has_fin_ = end_stream;
-  return {payload_length, end_data};
-}
-
-bool VisitorDataSource::Send(absl::string_view frame_header,
-                             size_t payload_length) {
-  return visitor_.SendDataFrame(stream_id_, frame_header, payload_length);
-}
-
 TestMetadataSource::TestMetadataSource(const quiche::HttpHeaderBlock& entries)
     : encoded_entries_(EncodeHeaders(entries)) {
   remaining_ = encoded_entries_;
diff --git a/quiche/http2/adapter/test_utils.h b/quiche/http2/adapter/test_utils.h
index 05e1c53..3a54a37 100644
--- a/quiche/http2/adapter/test_utils.h
+++ b/quiche/http2/adapter/test_utils.h
@@ -104,22 +104,6 @@
   bool has_write_error_ = false;
 };
 
-// A DataFrameSource that invokes visitor methods.
-class QUICHE_NO_EXPORT VisitorDataSource : public DataFrameSource {
- public:
-  VisitorDataSource(Http2VisitorInterface& visitor, Http2StreamId stream_id);
-
-  std::pair<int64_t, bool> SelectPayloadLength(size_t max_length) override;
-  bool Send(absl::string_view frame_header, size_t payload_length) override;
-  bool send_fin() const override;
-
- private:
-  Http2VisitorInterface& visitor_;
-  const Http2StreamId stream_id_;
-  // Whether the stream should end with the final frame of data.
-  bool has_fin_ = false;
-};
-
 class QUICHE_NO_EXPORT TestMetadataSource : public MetadataSource {
  public:
   explicit TestMetadataSource(const quiche::HttpHeaderBlock& entries);