Implements read block detection in OgHttp2Session, and fixes read block handling in Nghttp2DataFrameSource.
Note that currently, there is no way to reschedule a stream that encounters a read block in the OgHttp2 stack. (Coming in a future change.)
PiperOrigin-RevId: 378512810
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index ded57cc..65a0c7e 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -24,6 +24,24 @@
WINDOW_UPDATE,
};
+// This send callback assumes |source|'s pointer is a TestDataSource, and
+// |user_data| is a Http2VisitorInterface.
+int TestSendCallback(nghttp2_session*,
+ nghttp2_frame* frame,
+ const uint8_t* framehd,
+ size_t length,
+ nghttp2_data_source* source,
+ void* user_data) {
+ auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
+ // Send the frame header via the visitor.
+ visitor->OnReadyToSend(ToStringView(framehd, 9));
+ auto* test_source = static_cast<TestDataSource*>(source->ptr);
+ absl::string_view payload = test_source->ReadNext(length);
+ // Send the frame payload via the visitor.
+ visitor->OnReadyToSend(payload);
+ return 0;
+}
+
TEST(NgHttp2AdapterTest, ClientConstruction) {
testing::StrictMock<MockHttp2Visitor> visitor;
auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
@@ -269,22 +287,8 @@
TestDataSource body1{kBody};
// The TestDataSource is wrapped in the nghttp2_data_provider data type.
nghttp2_data_provider provider = body1.MakeDataProvider();
+ nghttp2_send_data_callback send_callback = &TestSendCallback;
- // This send callback assumes |source|'s pointer is a TestDataSource, which we
- // know is true because we just converted |body1| into a nghttp2_data_provider
- // above.
- nghttp2_send_data_callback send_callback =
- [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd,
- size_t length, nghttp2_data_source* source, void* user_data) {
- auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
- // Send the frame header via the visitor.
- visitor->OnReadyToSend(ToStringView(framehd, 9));
- auto* test_source = static_cast<TestDataSource*>(source->ptr);
- absl::string_view payload = test_source->ReadNext(length);
- // Send the frame payload via the visitor.
- visitor->OnReadyToSend(payload);
- return 0;
- };
// This call transforms it back into a DataFrameSource, which is compatible
// with the Http2Adapter API.
std::unique_ptr<DataFrameSource> frame_source =
@@ -304,6 +308,53 @@
EXPECT_FALSE(adapter->session().want_write());
}
+// This test verifies how nghttp2 behaves when a data source becomes
+// read-blocked.
+TEST(NgHttp2AdapterTest, ClientSubmitRequestWithDataProviderAndReadBlock) {
+ DataSavingVisitor visitor;
+ auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+ const absl::string_view kBody = "This is an example request body.";
+ // This test will use TestDataSource as the source of the body payload data.
+ TestDataSource body1{kBody};
+ body1.set_is_data_available(false);
+ // The TestDataSource is wrapped in the nghttp2_data_provider data type.
+ nghttp2_data_provider provider = body1.MakeDataProvider();
+ nghttp2_send_data_callback send_callback = &TestSendCallback;
+
+ // This call transforms it back into a DataFrameSource, which is compatible
+ // with the Http2Adapter API.
+ std::unique_ptr<DataFrameSource> frame_source =
+ MakeZeroCopyDataFrameSource(provider, &visitor, std::move(send_callback));
+ int stream_id =
+ adapter->SubmitRequest(ToHeaders({{":method", "POST"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}}),
+ frame_source.get(), nullptr);
+ EXPECT_GT(stream_id, 0);
+ EXPECT_TRUE(adapter->session().want_write());
+
+ adapter->Send();
+ // Client preface does not appear to include the mandatory SETTINGS frame.
+ absl::string_view serialized = visitor.data();
+ EXPECT_THAT(serialized,
+ testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+ serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+ EXPECT_THAT(serialized, EqualsFrames({spdy::SpdyFrameType::HEADERS}));
+ visitor.Clear();
+ EXPECT_FALSE(adapter->session().want_write());
+
+ // Resume the deferred stream.
+ body1.set_is_data_available(true);
+ nghttp2_session_resume_data(adapter->session().raw_ptr(), stream_id);
+ EXPECT_TRUE(adapter->session().want_write());
+
+ adapter->Send();
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::DATA}));
+ EXPECT_FALSE(adapter->session().want_write());
+}
+
TEST(NgHttp2AdapterTest, ServerConstruction) {
testing::StrictMock<MockHttp2Visitor> visitor;
auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/nghttp2_data_provider.cc b/http2/adapter/nghttp2_data_provider.cc
index 7a70965..83037e4 100644
--- a/http2/adapter/nghttp2_data_provider.cc
+++ b/http2/adapter/nghttp2_data_provider.cc
@@ -18,6 +18,7 @@
uint32_t* data_flags,
nghttp2_data_source* source,
void* /* user_data */) {
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
auto* frame_source = static_cast<DataFrameSource*>(source->ptr);
auto [result_length, done] = frame_source->SelectPayloadLength(length);
if (result_length == DataFrameSource::kBlocked) {
@@ -25,7 +26,6 @@
} else if (result_length == DataFrameSource::kError) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
- *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
if (done) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
diff --git a/http2/adapter/nghttp2_util.cc b/http2/adapter/nghttp2_util.cc
index d97c64a..812bbec 100644
--- a/http2/adapter/nghttp2_util.cc
+++ b/http2/adapter/nghttp2_util.cc
@@ -133,11 +133,13 @@
ssize_t result = provider_.read_callback(
nullptr /* session */, stream_id, nullptr /* buf */, max_length,
&data_flags, &provider_.source, nullptr /* user_data */);
- if (result < 0) {
- return {-1, false};
+ if (result == NGHTTP2_ERR_DEFERRED) {
+ return {kBlocked, false};
+ } else if (result < 0) {
+ return {kError, false};
} else if ((data_flags & NGHTTP2_DATA_FLAG_NO_COPY) == 0) {
QUICHE_LOG(ERROR) << "Source did not use the zero-copy API!";
- return {-1, false};
+ return {kError, false};
} else {
if (data_flags & NGHTTP2_DATA_FLAG_NO_END_STREAM) {
send_fin_ = false;
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 330476c..8c10656 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -127,7 +127,7 @@
const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
// TODO(birenroy): Add a return value to indicate write blockage, so streams
// aren't woken unnecessarily.
- WriteForStream(stream_id);
+ continue_writing = WriteForStream(stream_id);
}
if (continue_writing) {
SendQueuedFrames();
@@ -152,12 +152,12 @@
return true;
}
-void OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
+bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
if (it == stream_map_.end()) {
QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
<< " which is ready to write!";
- return;
+ return true;
}
StreamState& state = it->second;
if (state.outbound_body == nullptr) {
@@ -171,13 +171,22 @@
MaybeCloseWithRstStream(stream_id, state);
}
}
- return;
+ return true;
}
+ bool source_can_produce = true;
int32_t available_window =
std::min(std::min(peer_window_, state.send_window), max_frame_payload_);
while (available_window > 0 && state.outbound_body != nullptr) {
auto [length, end_data] =
state.outbound_body->SelectPayloadLength(available_window);
+ if (length == DataFrameSource::kBlocked) {
+ source_can_produce = false;
+ break;
+ } else if (length == DataFrameSource::kError) {
+ source_can_produce = false;
+ visitor_.OnCloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
+ break;
+ }
const bool fin = end_data ? state.outbound_body->send_fin() : false;
spdy::SpdyDataIR data(stream_id);
data.set_fin(fin);
@@ -208,9 +217,11 @@
}
// If the stream still has data to send, it should be marked as ready in the
// write scheduler.
- if (state.send_window > 0 && state.outbound_body != nullptr) {
+ if (source_can_produce && state.send_window > 0 &&
+ state.outbound_body != nullptr) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
+ return available_window > 0;
}
int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 30bc9d8..c5b9efc 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -160,7 +160,10 @@
// Sends queued frames, returning true if all frames were flushed.
bool SendQueuedFrames();
- void WriteForStream(Http2StreamId stream_id);
+ // Returns false if the connection is write-blocked (due to flow control or
+ // some other reason).
+ bool WriteForStream(Http2StreamId stream_id);
+
void SendTrailers(Http2StreamId stream_id, spdy::SpdyHeaderBlock trailers);
// Encapsulates the RST_STREAM NO_ERROR behavior described in RFC 7540
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 185fd3a..636fda1 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -221,6 +221,41 @@
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
}
+// This test exercises the case where the client request body source is read
+// blocked.
+TEST(OgHttp2SessionTest, ClientSubmitRequestWithReadBlock) {
+ DataSavingVisitor visitor;
+ OgHttp2Session session(
+ visitor, OgHttp2Session::Options{.perspective = Perspective::kClient});
+ EXPECT_FALSE(session.want_write());
+
+ const char* kSentinel1 = "arbitrary pointer 1";
+ TestDataFrameSource body1(visitor, "This is an example request body.");
+ body1.set_is_data_available(false);
+ int stream_id =
+ session.SubmitRequest(ToHeaders({{":method", "POST"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}}),
+ &body1, const_cast<char*>(kSentinel1));
+ EXPECT_GT(stream_id, 0);
+ EXPECT_TRUE(session.want_write());
+ EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
+ session.Send();
+ absl::string_view serialized = visitor.data();
+ EXPECT_THAT(serialized,
+ testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+ serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+ EXPECT_THAT(serialized,
+ EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS}));
+ // No data frame, as body1 was read blocked.
+ visitor.Clear();
+ EXPECT_FALSE(session.want_write());
+
+ // Currently there is no way to indicate that the first stream is no longer
+ // read blocked.
+}
+
TEST(OgHttp2SessionTest, ClientStartShutdown) {
DataSavingVisitor visitor;
OgHttp2Session session(
diff --git a/http2/adapter/test_utils.cc b/http2/adapter/test_utils.cc
index 3ccd63e..c6e98a2 100644
--- a/http2/adapter/test_utils.cc
+++ b/http2/adapter/test_utils.cc
@@ -36,13 +36,16 @@
std::pair<ssize_t, bool> TestDataFrameSource::SelectPayloadLength(
size_t max_length) {
+ if (!is_data_available_) {
+ return {kBlocked, false};
+ }
// The stream is done if there's no more data, or if |max_length| is at least
// as large as the remaining data.
const bool end_data =
current_fragment_.empty() || (payload_fragments_.size() == 1 &&
max_length >= current_fragment_.size());
const ssize_t length = std::min(max_length, current_fragment_.size());
- return std::make_pair(length, end_data);
+ return {length, end_data};
}
void TestDataFrameSource::Send(absl::string_view frame_header,
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index 0ea7ec4..de001a1 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -50,11 +50,14 @@
void Send(absl::string_view frame_header, size_t payload_length) override;
bool send_fin() const override { return has_fin_; }
+ void set_is_data_available(bool value) { is_data_available_ = value; }
+
private:
Http2VisitorInterface& visitor_;
std::vector<std::string> payload_fragments_;
absl::string_view current_fragment_;
const bool has_fin_;
+ bool is_data_available_ = true;
};
// A simple class that can easily be adapted to act as a nghttp2_data_source.
@@ -78,20 +81,27 @@
.source = {.ptr = this},
.read_callback = [](nghttp2_session*, int32_t, uint8_t*, size_t length,
uint32_t* data_flags, nghttp2_data_source* source,
- void*) {
+ void*) -> ssize_t {
+ *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
auto* s = static_cast<TestDataSource*>(source->ptr);
+ if (!s->is_data_available()) {
+ return NGHTTP2_ERR_DEFERRED;
+ }
const ssize_t ret = s->SelectPayloadLength(length);
if (ret < length) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
- *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
return ret;
}};
}
+ bool is_data_available() const { return is_data_available_; }
+ void set_is_data_available(bool value) { is_data_available_ = value; }
+
private:
const std::string data_;
absl::string_view remaining_ = data_;
+ bool is_data_available_ = true;
};
// These matchers check whether a string consists entirely of HTTP/2 frames of