Modifies the Http2Adapter API to accept DataFrameSources as unique_ptrs.

This simplifies the ownership model for users of the library.

PiperOrigin-RevId: 380223157
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index 2c66081..32a9ba1 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -107,19 +107,16 @@
 
   // Returns the assigned stream ID if the operation succeeds. Otherwise,
   // returns a negative integer indicating an error code. |data_source| may be
-  // nullptr if the request does not have a body. Does not take ownership of
-  // |data_source|, which should be deleted by the caller when the stream is
-  // closed.
+  // nullptr if the request does not have a body.
   virtual int32_t SubmitRequest(absl::Span<const Header> headers,
-                                DataFrameSource* data_source,
+                                std::unique_ptr<DataFrameSource> data_source,
                                 void* user_data) = 0;
 
   // Returns 0 on success. |data_source| may be nullptr if the response does not
-  // have a body. Does not take ownership of |data_source|, which should be
-  // deleted by the caller when the stream is closed.
-  virtual int32_t SubmitResponse(Http2StreamId stream_id,
-                                 absl::Span<const Header> headers,
-                                 DataFrameSource* data_source) = 0;
+  // have a body.
+  virtual int SubmitResponse(Http2StreamId stream_id,
+                             absl::Span<const Header> headers,
+                             std::unique_ptr<DataFrameSource> data_source) = 0;
 
   // Queues trailers to be sent after any outstanding data on the stream with ID
   // |stream_id|. Returns 0 on success.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 8eac860..c6fe12a 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -158,37 +158,55 @@
   }
 }
 
-int32_t NgHttp2Adapter::SubmitRequest(absl::Span<const Header> headers,
-                                      DataFrameSource* data_source,
-                                      void* user_data) {
+int32_t NgHttp2Adapter::SubmitRequest(
+    absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source, void* stream_user_data) {
   auto nvs = GetNghttp2Nvs(headers);
   std::unique_ptr<nghttp2_data_provider> provider =
-      MakeDataProvider(data_source);
-  return nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
-                                nvs.size(), provider.get(), user_data);
+      MakeDataProvider(data_source.get());
+
+  int32_t stream_id =
+      nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
+                             nvs.size(), provider.get(), stream_user_data);
+  // TODO(birenroy): clean up data source on stream close
+  sources_.emplace(stream_id, std::move(data_source));
+  QUICHE_VLOG(1) << "Submitted request with " << nvs.size()
+                 << " request headers and user data " << stream_user_data
+                 << "; resulted in stream " << stream_id;
+  return stream_id;
 }
 
-int32_t NgHttp2Adapter::SubmitResponse(Http2StreamId stream_id,
-                                       absl::Span<const Header> headers,
-                                       DataFrameSource* data_source) {
+int NgHttp2Adapter::SubmitResponse(
+    Http2StreamId stream_id, absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source) {
   auto nvs = GetNghttp2Nvs(headers);
   std::unique_ptr<nghttp2_data_provider> provider =
-      MakeDataProvider(data_source);
-  return nghttp2_submit_response(session_->raw_ptr(), stream_id, nvs.data(),
-                                 nvs.size(), provider.get());
+      MakeDataProvider(data_source.get());
+
+  // TODO(birenroy): clean up data source on stream close
+  sources_.emplace(stream_id, std::move(data_source));
+
+  int result = nghttp2_submit_response(session_->raw_ptr(), stream_id,
+                                       nvs.data(), nvs.size(), provider.get());
+  QUICHE_VLOG(1) << "Submitted response with " << nvs.size()
+                 << " response headers; result = " << result;
+  return result;
 }
 
 int NgHttp2Adapter::SubmitTrailer(Http2StreamId stream_id,
                                   absl::Span<const Header> trailers) {
   auto nvs = GetNghttp2Nvs(trailers);
-  return nghttp2_submit_trailer(session_->raw_ptr(), stream_id, nvs.data(),
-                                nvs.size());
+  int result = nghttp2_submit_trailer(session_->raw_ptr(), stream_id,
+                                      nvs.data(), nvs.size());
+  QUICHE_VLOG(1) << "Submitted trailers with " << nvs.size()
+                 << " response trailers; result = " << result;
+  return result;
 }
 
 void NgHttp2Adapter::SetStreamUserData(Http2StreamId stream_id,
-                                       void* user_data) {
+                                       void* stream_user_data) {
   nghttp2_session_set_stream_user_data(session_->raw_ptr(), stream_id,
-                                       user_data);
+                                       stream_user_data);
 }
 
 void* NgHttp2Adapter::GetStreamUserData(Http2StreamId stream_id) {
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index c49d017..e9fa7d2 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -1,6 +1,7 @@
 #ifndef QUICHE_HTTP2_ADAPTER_NGHTTP2_ADAPTER_H_
 #define QUICHE_HTTP2_ADAPTER_NGHTTP2_ADAPTER_H_
 
+#include "absl/container/flat_hash_map.h"
 #include "http2/adapter/http2_adapter.h"
 #include "http2/adapter/http2_protocol.h"
 #include "http2/adapter/nghttp2_session.h"
@@ -65,12 +66,11 @@
                                  size_t num_bytes) override;
 
   int32_t SubmitRequest(absl::Span<const Header> headers,
-                        DataFrameSource* data_source,
+                        std::unique_ptr<DataFrameSource> data_source,
                         void* user_data) override;
 
-  int32_t SubmitResponse(Http2StreamId stream_id,
-                         absl::Span<const Header> headers,
-                         DataFrameSource* data_source) override;
+  int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+                     std::unique_ptr<DataFrameSource> data_source) override;
 
   int SubmitTrailer(Http2StreamId stream_id,
                     absl::Span<const Header> trailers) override;
@@ -94,6 +94,8 @@
   std::unique_ptr<NgHttp2Session> session_;
   Http2VisitorInterface& visitor_;
   Perspective perspective_;
+
+  absl::flat_hash_map<int32_t, std::unique_ptr<DataFrameSource>> sources_;
 };
 
 }  // namespace adapter
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 21a1ded..b0e7127 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -284,13 +284,13 @@
   EXPECT_FALSE(adapter->session().want_write());
   const char* kSentinel = "";
   const absl::string_view kBody = "This is an example request body.";
-  TestDataFrameSource body1(visitor, kBody);
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, kBody);
   int stream_id =
       adapter->SubmitRequest(ToHeaders({{":method", "POST"},
                                         {":scheme", "http"},
                                         {":authority", "example.com"},
                                         {":path", "/this/is/request/one"}}),
-                             &body1, const_cast<char*>(kSentinel));
+                             std::move(body1), const_cast<char*>(kSentinel));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->session().want_write());
   result = adapter->Send();
@@ -389,7 +389,7 @@
                                         {":scheme", "http"},
                                         {":authority", "example.com"},
                                         {":path", "/this/is/request/one"}}),
-                             frame_source.get(), nullptr);
+                             std::move(frame_source), nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->session().want_write());
   result = adapter->Send();
@@ -423,7 +423,7 @@
                                         {":scheme", "http"},
                                         {":authority", "example.com"},
                                         {":path", "/this/is/request/one"}}),
-                             frame_source.get(), nullptr);
+                             std::move(frame_source), nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->session().want_write());
 
@@ -475,7 +475,7 @@
                                         {":scheme", "http"},
                                         {":authority", "example.com"},
                                         {":path", "/this/is/request/one"}}),
-                             frame_source.get(), nullptr);
+                             std::move(frame_source), nullptr);
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->session().want_write());
 
@@ -664,12 +664,13 @@
 
   EXPECT_FALSE(adapter->session().want_write());
   const absl::string_view kBody = "This is an example response body.";
-  TestDataFrameSource body1(visitor, kBody, /*has_fin=*/false);
+  auto body1 =
+      absl::make_unique<TestDataFrameSource>(visitor, kBody, /*has_fin=*/false);
   int submit_result = adapter->SubmitResponse(
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      &body1);
+      std::move(body1));
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->session().want_write());
 
@@ -783,10 +784,11 @@
 
   // The body source must indicate that the end of the body is not the end of
   // the stream.
-  TestDataFrameSource body1(visitor, kBody, /*has_fin=*/false);
+  auto body1 =
+      absl::make_unique<TestDataFrameSource>(visitor, kBody, /*has_fin=*/false);
   int submit_result = adapter->SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      &body1);
+      std::move(body1));
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(adapter->session().want_write());
   EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 6f56d40..47270dd 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -124,16 +124,16 @@
       stream_id, TranslateErrorCode(error_code)));
 }
 
-int32_t OgHttp2Adapter::SubmitRequest(absl::Span<const Header> headers,
-                                      DataFrameSource* data_source,
-                                      void* user_data) {
-  return session_->SubmitRequest(headers, data_source, user_data);
+int32_t OgHttp2Adapter::SubmitRequest(
+    absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source, void* user_data) {
+  return session_->SubmitRequest(headers, std::move(data_source), user_data);
 }
 
-int32_t OgHttp2Adapter::SubmitResponse(Http2StreamId stream_id,
-                                       absl::Span<const Header> headers,
-                                       DataFrameSource* data_source) {
-  return session_->SubmitResponse(stream_id, headers, data_source);
+int OgHttp2Adapter::SubmitResponse(
+    Http2StreamId stream_id, absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source) {
+  return session_->SubmitResponse(stream_id, headers, std::move(data_source));
 }
 
 int OgHttp2Adapter::SubmitTrailer(Http2StreamId stream_id,
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index b796e1e..e2ea402 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -47,11 +47,10 @@
                                  size_t num_bytes) override;
   void SubmitRst(Http2StreamId stream_id, Http2ErrorCode error_code) override;
   int32_t SubmitRequest(absl::Span<const Header> headers,
-                        DataFrameSource* data_source,
+                        std::unique_ptr<DataFrameSource> data_source,
                         void* user_data) override;
-  int32_t SubmitResponse(Http2StreamId stream_id,
-                         absl::Span<const Header> headers,
-                         DataFrameSource* data_source) override;
+  int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+                     std::unique_ptr<DataFrameSource> data_source) override;
 
   int SubmitTrailer(Http2StreamId stream_id,
                     absl::Span<const Header> trailers) override;
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 949c67b..6d252e2 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -295,9 +295,9 @@
   return connection_can_write && available_window > 0;
 }
 
-int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
-                                      DataFrameSource* data_source,
-                                      void* user_data) {
+int32_t OgHttp2Session::SubmitRequest(
+    absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source, void* user_data) {
   // TODO(birenroy): return an error for the incorrect perspective
   const Http2StreamId stream_id = next_stream_id_;
   next_stream_id_ += 2;
@@ -316,7 +316,7 @@
     QUICHE_LOG(DFATAL) << "Stream " << stream_id << " already exists!";
     return -501;  // NGHTTP2_ERR_INVALID_ARGUMENT
   }
-  iter->second.outbound_body = data_source;
+  iter->second.outbound_body = std::move(data_source);
   iter->second.user_data = user_data;
   if (data_source == nullptr) {
     frame->set_fin(true);
@@ -331,9 +331,9 @@
   return stream_id;
 }
 
-int32_t OgHttp2Session::SubmitResponse(Http2StreamId stream_id,
-                                       absl::Span<const Header> headers,
-                                       DataFrameSource* data_source) {
+int OgHttp2Session::SubmitResponse(
+    Http2StreamId stream_id, absl::Span<const Header> headers,
+    std::unique_ptr<DataFrameSource> data_source) {
   // TODO(birenroy): return an error for the incorrect perspective
   auto iter = stream_map_.find(stream_id);
   if (iter == stream_map_.end()) {
@@ -352,7 +352,7 @@
     // when calling visitor_.OnCloseStream.
   } else {
     // Add data source to stream state
-    iter->second.outbound_body = data_source;
+    iter->second.outbound_body = std::move(data_source);
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
   EnqueueFrame(std::move(frame));
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index a22811f..30306fe 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -38,11 +38,10 @@
   int Send();
 
   int32_t SubmitRequest(absl::Span<const Header> headers,
-                        DataFrameSource* data_source,
+                        std::unique_ptr<DataFrameSource> data_source,
                         void* user_data);
-  int32_t SubmitResponse(Http2StreamId stream_id,
-                         absl::Span<const Header> headers,
-                         DataFrameSource* data_source);
+  int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+                     std::unique_ptr<DataFrameSource> data_source);
   int SubmitTrailer(Http2StreamId stream_id, absl::Span<const Header> trailers);
 
   bool IsServerSession() const {
@@ -152,7 +151,7 @@
         : window_manager(stream_receive_window, std::move(listener)) {}
 
     WindowManager window_manager;
-    DataFrameSource* outbound_body = nullptr;
+    std::unique_ptr<DataFrameSource> outbound_body;
     std::unique_ptr<spdy::SpdyHeaderBlock> trailers;
     void* user_data = nullptr;
     int32_t send_window = kInitialFlowControlWindowSize;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 73e59b0..97ec7df 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -76,13 +76,14 @@
 
   // Submit a request to ensure the first stream is created.
   const char* kSentinel1 = "arbitrary pointer 1";
-  TestDataFrameSource body1(visitor, "This is an example request body.");
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example request body.");
   int stream_id =
       session.SubmitRequest(ToHeaders({{":method", "POST"},
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            &body1, const_cast<char*>(kSentinel1));
+                            std::move(body1), const_cast<char*>(kSentinel1));
   EXPECT_EQ(stream_id, 1);
 
   const std::string stream_frames =
@@ -225,13 +226,14 @@
   EXPECT_EQ(0, session.GetHpackEncoderDynamicTableSize());
 
   const char* kSentinel1 = "arbitrary pointer 1";
-  TestDataFrameSource body1(visitor, "This is an example request body.");
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example request body.");
   int stream_id =
       session.SubmitRequest(ToHeaders({{":method", "POST"},
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            &body1, const_cast<char*>(kSentinel1));
+                            std::move(body1), const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -284,14 +286,16 @@
   EXPECT_FALSE(session.want_write());
 
   const char* kSentinel1 = "arbitrary pointer 1";
-  TestDataFrameSource body1(visitor, "This is an example request body.");
-  body1.set_is_data_available(false);
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example request body.");
+  TestDataFrameSource* body_ref = body1.get();
+  body_ref->set_is_data_available(false);
   int stream_id =
       session.SubmitRequest(ToHeaders({{":method", "POST"},
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            &body1, const_cast<char*>(kSentinel1));
+                            std::move(body1), const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -307,7 +311,7 @@
   visitor.Clear();
   EXPECT_FALSE(session.want_write());
 
-  body1.set_is_data_available(true);
+  body_ref->set_is_data_available(true);
   EXPECT_TRUE(session.ResumeStream(stream_id));
   EXPECT_TRUE(session.want_write());
   result = session.Send();
@@ -329,13 +333,14 @@
   EXPECT_FALSE(session.want_write());
 
   const char* kSentinel1 = "arbitrary pointer 1";
-  TestDataFrameSource body1(visitor, "This is an example request body.");
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example request body.");
   int stream_id =
       session.SubmitRequest(ToHeaders({{":method", "POST"},
                                        {":scheme", "http"},
                                        {":authority", "example.com"},
                                        {":path", "/this/is/request/one"}}),
-                            &body1, const_cast<char*>(kSentinel1));
+                            std::move(body1), const_cast<char*>(kSentinel1));
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(session.want_write());
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -580,13 +585,14 @@
   visitor.Clear();
 
   EXPECT_FALSE(session.want_write());
-  TestDataFrameSource body1(visitor, "This is an example response body.",
-                            /*has_fin=*/false);
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example response body.",
+      /*has_fin=*/false);
   int submit_result = session.SubmitResponse(
       1,
       ToHeaders({{":status", "404"},
                  {"x-comment", "I have no idea what you're talking about."}}),
-      &body1);
+      std::move(body1));
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
 
@@ -697,11 +703,12 @@
 
   // The body source must indicate that the end of the body is not the end of
   // the stream.
-  TestDataFrameSource body1(visitor, "This is an example response body.",
-                            /*has_fin=*/false);
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example response body.",
+      /*has_fin=*/false);
   int submit_result = session.SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      &body1);
+      std::move(body1));
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
   send_result = session.Send();
@@ -773,11 +780,12 @@
 
   // The body source must indicate that the end of the body is not the end of
   // the stream.
-  TestDataFrameSource body1(visitor, "This is an example response body.",
-                            /*has_fin=*/false);
+  auto body1 = absl::make_unique<TestDataFrameSource>(
+      visitor, "This is an example response body.",
+      /*has_fin=*/false);
   int submit_result = session.SubmitResponse(
       1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
-      &body1);
+      std::move(body1));
   EXPECT_EQ(submit_result, 0);
   EXPECT_TRUE(session.want_write());
   // There has not been a call to Send() yet, so neither headers nor body have