Modifies the Http2Adapter API to accept DataFrameSources as unique_ptrs.
This simplifies the ownership model for users of the library.
PiperOrigin-RevId: 380223157
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index 2c66081..32a9ba1 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -107,19 +107,16 @@
// Returns the assigned stream ID if the operation succeeds. Otherwise,
// returns a negative integer indicating an error code. |data_source| may be
- // nullptr if the request does not have a body. Does not take ownership of
- // |data_source|, which should be deleted by the caller when the stream is
- // closed.
+ // nullptr if the request does not have a body.
virtual int32_t SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
+ std::unique_ptr<DataFrameSource> data_source,
void* user_data) = 0;
// Returns 0 on success. |data_source| may be nullptr if the response does not
- // have a body. Does not take ownership of |data_source|, which should be
- // deleted by the caller when the stream is closed.
- virtual int32_t SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) = 0;
+ // have a body.
+ virtual int SubmitResponse(Http2StreamId stream_id,
+ absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) = 0;
// Queues trailers to be sent after any outstanding data on the stream with ID
// |stream_id|. Returns 0 on success.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 8eac860..c6fe12a 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -158,37 +158,55 @@
}
}
-int32_t NgHttp2Adapter::SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
- void* user_data) {
+int32_t NgHttp2Adapter::SubmitRequest(
+ absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source, void* stream_user_data) {
auto nvs = GetNghttp2Nvs(headers);
std::unique_ptr<nghttp2_data_provider> provider =
- MakeDataProvider(data_source);
- return nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
- nvs.size(), provider.get(), user_data);
+ MakeDataProvider(data_source.get());
+
+ int32_t stream_id =
+ nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
+ nvs.size(), provider.get(), stream_user_data);
+ // TODO(birenroy): clean up data source on stream close
+ sources_.emplace(stream_id, std::move(data_source));
+ QUICHE_VLOG(1) << "Submitted request with " << nvs.size()
+ << " request headers and user data " << stream_user_data
+ << "; resulted in stream " << stream_id;
+ return stream_id;
}
-int32_t NgHttp2Adapter::SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) {
+int NgHttp2Adapter::SubmitResponse(
+ Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) {
auto nvs = GetNghttp2Nvs(headers);
std::unique_ptr<nghttp2_data_provider> provider =
- MakeDataProvider(data_source);
- return nghttp2_submit_response(session_->raw_ptr(), stream_id, nvs.data(),
- nvs.size(), provider.get());
+ MakeDataProvider(data_source.get());
+
+ // TODO(birenroy): clean up data source on stream close
+ sources_.emplace(stream_id, std::move(data_source));
+
+ int result = nghttp2_submit_response(session_->raw_ptr(), stream_id,
+ nvs.data(), nvs.size(), provider.get());
+ QUICHE_VLOG(1) << "Submitted response with " << nvs.size()
+ << " response headers; result = " << result;
+ return result;
}
int NgHttp2Adapter::SubmitTrailer(Http2StreamId stream_id,
absl::Span<const Header> trailers) {
auto nvs = GetNghttp2Nvs(trailers);
- return nghttp2_submit_trailer(session_->raw_ptr(), stream_id, nvs.data(),
- nvs.size());
+ int result = nghttp2_submit_trailer(session_->raw_ptr(), stream_id,
+ nvs.data(), nvs.size());
+ QUICHE_VLOG(1) << "Submitted trailers with " << nvs.size()
+ << " response trailers; result = " << result;
+ return result;
}
void NgHttp2Adapter::SetStreamUserData(Http2StreamId stream_id,
- void* user_data) {
+ void* stream_user_data) {
nghttp2_session_set_stream_user_data(session_->raw_ptr(), stream_id,
- user_data);
+ stream_user_data);
}
void* NgHttp2Adapter::GetStreamUserData(Http2StreamId stream_id) {
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index c49d017..e9fa7d2 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -1,6 +1,7 @@
#ifndef QUICHE_HTTP2_ADAPTER_NGHTTP2_ADAPTER_H_
#define QUICHE_HTTP2_ADAPTER_NGHTTP2_ADAPTER_H_
+#include "absl/container/flat_hash_map.h"
#include "http2/adapter/http2_adapter.h"
#include "http2/adapter/http2_protocol.h"
#include "http2/adapter/nghttp2_session.h"
@@ -65,12 +66,11 @@
size_t num_bytes) override;
int32_t SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
+ std::unique_ptr<DataFrameSource> data_source,
void* user_data) override;
- int32_t SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) override;
+ int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) override;
int SubmitTrailer(Http2StreamId stream_id,
absl::Span<const Header> trailers) override;
@@ -94,6 +94,8 @@
std::unique_ptr<NgHttp2Session> session_;
Http2VisitorInterface& visitor_;
Perspective perspective_;
+
+ absl::flat_hash_map<int32_t, std::unique_ptr<DataFrameSource>> sources_;
};
} // namespace adapter
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 21a1ded..b0e7127 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -284,13 +284,13 @@
EXPECT_FALSE(adapter->session().want_write());
const char* kSentinel = "";
const absl::string_view kBody = "This is an example request body.";
- TestDataFrameSource body1(visitor, kBody);
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, kBody);
int stream_id =
adapter->SubmitRequest(ToHeaders({{":method", "POST"},
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- &body1, const_cast<char*>(kSentinel));
+ std::move(body1), const_cast<char*>(kSentinel));
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(adapter->session().want_write());
result = adapter->Send();
@@ -389,7 +389,7 @@
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- frame_source.get(), nullptr);
+ std::move(frame_source), nullptr);
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(adapter->session().want_write());
result = adapter->Send();
@@ -423,7 +423,7 @@
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- frame_source.get(), nullptr);
+ std::move(frame_source), nullptr);
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(adapter->session().want_write());
@@ -475,7 +475,7 @@
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- frame_source.get(), nullptr);
+ std::move(frame_source), nullptr);
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(adapter->session().want_write());
@@ -664,12 +664,13 @@
EXPECT_FALSE(adapter->session().want_write());
const absl::string_view kBody = "This is an example response body.";
- TestDataFrameSource body1(visitor, kBody, /*has_fin=*/false);
+ auto body1 =
+ absl::make_unique<TestDataFrameSource>(visitor, kBody, /*has_fin=*/false);
int submit_result = adapter->SubmitResponse(
1,
ToHeaders({{":status", "404"},
{"x-comment", "I have no idea what you're talking about."}}),
- &body1);
+ std::move(body1));
EXPECT_EQ(submit_result, 0);
EXPECT_TRUE(adapter->session().want_write());
@@ -783,10 +784,11 @@
// The body source must indicate that the end of the body is not the end of
// the stream.
- TestDataFrameSource body1(visitor, kBody, /*has_fin=*/false);
+ auto body1 =
+ absl::make_unique<TestDataFrameSource>(visitor, kBody, /*has_fin=*/false);
int submit_result = adapter->SubmitResponse(
1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
- &body1);
+ std::move(body1));
EXPECT_EQ(submit_result, 0);
EXPECT_TRUE(adapter->session().want_write());
EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 6f56d40..47270dd 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -124,16 +124,16 @@
stream_id, TranslateErrorCode(error_code)));
}
-int32_t OgHttp2Adapter::SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
- void* user_data) {
- return session_->SubmitRequest(headers, data_source, user_data);
+int32_t OgHttp2Adapter::SubmitRequest(
+ absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source, void* user_data) {
+ return session_->SubmitRequest(headers, std::move(data_source), user_data);
}
-int32_t OgHttp2Adapter::SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) {
- return session_->SubmitResponse(stream_id, headers, data_source);
+int OgHttp2Adapter::SubmitResponse(
+ Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) {
+ return session_->SubmitResponse(stream_id, headers, std::move(data_source));
}
int OgHttp2Adapter::SubmitTrailer(Http2StreamId stream_id,
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index b796e1e..e2ea402 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -47,11 +47,10 @@
size_t num_bytes) override;
void SubmitRst(Http2StreamId stream_id, Http2ErrorCode error_code) override;
int32_t SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
+ std::unique_ptr<DataFrameSource> data_source,
void* user_data) override;
- int32_t SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) override;
+ int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) override;
int SubmitTrailer(Http2StreamId stream_id,
absl::Span<const Header> trailers) override;
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 949c67b..6d252e2 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -295,9 +295,9 @@
return connection_can_write && available_window > 0;
}
-int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
- void* user_data) {
+int32_t OgHttp2Session::SubmitRequest(
+ absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source, void* user_data) {
// TODO(birenroy): return an error for the incorrect perspective
const Http2StreamId stream_id = next_stream_id_;
next_stream_id_ += 2;
@@ -316,7 +316,7 @@
QUICHE_LOG(DFATAL) << "Stream " << stream_id << " already exists!";
return -501; // NGHTTP2_ERR_INVALID_ARGUMENT
}
- iter->second.outbound_body = data_source;
+ iter->second.outbound_body = std::move(data_source);
iter->second.user_data = user_data;
if (data_source == nullptr) {
frame->set_fin(true);
@@ -331,9 +331,9 @@
return stream_id;
}
-int32_t OgHttp2Session::SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source) {
+int OgHttp2Session::SubmitResponse(
+ Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source) {
// TODO(birenroy): return an error for the incorrect perspective
auto iter = stream_map_.find(stream_id);
if (iter == stream_map_.end()) {
@@ -352,7 +352,7 @@
// when calling visitor_.OnCloseStream.
} else {
// Add data source to stream state
- iter->second.outbound_body = data_source;
+ iter->second.outbound_body = std::move(data_source);
write_scheduler_.MarkStreamReady(stream_id, false);
}
EnqueueFrame(std::move(frame));
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index a22811f..30306fe 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -38,11 +38,10 @@
int Send();
int32_t SubmitRequest(absl::Span<const Header> headers,
- DataFrameSource* data_source,
+ std::unique_ptr<DataFrameSource> data_source,
void* user_data);
- int32_t SubmitResponse(Http2StreamId stream_id,
- absl::Span<const Header> headers,
- DataFrameSource* data_source);
+ int SubmitResponse(Http2StreamId stream_id, absl::Span<const Header> headers,
+ std::unique_ptr<DataFrameSource> data_source);
int SubmitTrailer(Http2StreamId stream_id, absl::Span<const Header> trailers);
bool IsServerSession() const {
@@ -152,7 +151,7 @@
: window_manager(stream_receive_window, std::move(listener)) {}
WindowManager window_manager;
- DataFrameSource* outbound_body = nullptr;
+ std::unique_ptr<DataFrameSource> outbound_body;
std::unique_ptr<spdy::SpdyHeaderBlock> trailers;
void* user_data = nullptr;
int32_t send_window = kInitialFlowControlWindowSize;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 73e59b0..97ec7df 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -76,13 +76,14 @@
// Submit a request to ensure the first stream is created.
const char* kSentinel1 = "arbitrary pointer 1";
- TestDataFrameSource body1(visitor, "This is an example request body.");
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example request body.");
int stream_id =
session.SubmitRequest(ToHeaders({{":method", "POST"},
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- &body1, const_cast<char*>(kSentinel1));
+ std::move(body1), const_cast<char*>(kSentinel1));
EXPECT_EQ(stream_id, 1);
const std::string stream_frames =
@@ -225,13 +226,14 @@
EXPECT_EQ(0, session.GetHpackEncoderDynamicTableSize());
const char* kSentinel1 = "arbitrary pointer 1";
- TestDataFrameSource body1(visitor, "This is an example request body.");
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example request body.");
int stream_id =
session.SubmitRequest(ToHeaders({{":method", "POST"},
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- &body1, const_cast<char*>(kSentinel1));
+ std::move(body1), const_cast<char*>(kSentinel1));
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(session.want_write());
EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -284,14 +286,16 @@
EXPECT_FALSE(session.want_write());
const char* kSentinel1 = "arbitrary pointer 1";
- TestDataFrameSource body1(visitor, "This is an example request body.");
- body1.set_is_data_available(false);
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example request body.");
+ TestDataFrameSource* body_ref = body1.get();
+ body_ref->set_is_data_available(false);
int stream_id =
session.SubmitRequest(ToHeaders({{":method", "POST"},
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- &body1, const_cast<char*>(kSentinel1));
+ std::move(body1), const_cast<char*>(kSentinel1));
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(session.want_write());
EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -307,7 +311,7 @@
visitor.Clear();
EXPECT_FALSE(session.want_write());
- body1.set_is_data_available(true);
+ body_ref->set_is_data_available(true);
EXPECT_TRUE(session.ResumeStream(stream_id));
EXPECT_TRUE(session.want_write());
result = session.Send();
@@ -329,13 +333,14 @@
EXPECT_FALSE(session.want_write());
const char* kSentinel1 = "arbitrary pointer 1";
- TestDataFrameSource body1(visitor, "This is an example request body.");
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example request body.");
int stream_id =
session.SubmitRequest(ToHeaders({{":method", "POST"},
{":scheme", "http"},
{":authority", "example.com"},
{":path", "/this/is/request/one"}}),
- &body1, const_cast<char*>(kSentinel1));
+ std::move(body1), const_cast<char*>(kSentinel1));
EXPECT_GT(stream_id, 0);
EXPECT_TRUE(session.want_write());
EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
@@ -580,13 +585,14 @@
visitor.Clear();
EXPECT_FALSE(session.want_write());
- TestDataFrameSource body1(visitor, "This is an example response body.",
- /*has_fin=*/false);
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example response body.",
+ /*has_fin=*/false);
int submit_result = session.SubmitResponse(
1,
ToHeaders({{":status", "404"},
{"x-comment", "I have no idea what you're talking about."}}),
- &body1);
+ std::move(body1));
EXPECT_EQ(submit_result, 0);
EXPECT_TRUE(session.want_write());
@@ -697,11 +703,12 @@
// The body source must indicate that the end of the body is not the end of
// the stream.
- TestDataFrameSource body1(visitor, "This is an example response body.",
- /*has_fin=*/false);
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example response body.",
+ /*has_fin=*/false);
int submit_result = session.SubmitResponse(
1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
- &body1);
+ std::move(body1));
EXPECT_EQ(submit_result, 0);
EXPECT_TRUE(session.want_write());
send_result = session.Send();
@@ -773,11 +780,12 @@
// The body source must indicate that the end of the body is not the end of
// the stream.
- TestDataFrameSource body1(visitor, "This is an example response body.",
- /*has_fin=*/false);
+ auto body1 = absl::make_unique<TestDataFrameSource>(
+ visitor, "This is an example response body.",
+ /*has_fin=*/false);
int submit_result = session.SubmitResponse(
1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
- &body1);
+ std::move(body1));
EXPECT_EQ(submit_result, 0);
EXPECT_TRUE(session.want_write());
// There has not been a call to Send() yet, so neither headers nor body have