Adds a return value to DataFrameSource::Send(), indicating whether the send operation would encounter a write block.
PiperOrigin-RevId: 378651262
diff --git a/http2/adapter/data_source.h b/http2/adapter/data_source.h
index 5658bfd..2509358 100644
--- a/http2/adapter/data_source.h
+++ b/http2/adapter/data_source.h
@@ -23,9 +23,9 @@
virtual std::pair<ssize_t, bool> SelectPayloadLength(size_t max_length) = 0;
// This method is called with a frame header and a payload length to send. The
- // source should send or buffer the entire frame.
- // TODO(birenroy): Consider adding a return value to indicate write blockage.
- virtual void Send(absl::string_view frame_header, size_t payload_length) = 0;
+ // source should send or buffer the entire frame and return true, or return
+ // false without sending or buffering anything.
+ virtual bool Send(absl::string_view frame_header, size_t payload_length) = 0;
// If true, the end of this data source indicates the end of the stream.
// Otherwise, this data will be followed by trailers.
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 65a0c7e..d930a26 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -34,7 +34,10 @@
void* user_data) {
auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
// Send the frame header via the visitor.
- visitor->OnReadyToSend(ToStringView(framehd, 9));
+ ssize_t result = visitor->OnReadyToSend(ToStringView(framehd, 9));
+ if (result == 0) {
+ return NGHTTP2_ERR_WOULDBLOCK;
+ }
auto* test_source = static_cast<TestDataSource*>(source->ptr);
absl::string_view payload = test_source->ReadNext(length);
// Send the frame payload via the visitor.
@@ -355,6 +358,50 @@
EXPECT_FALSE(adapter->session().want_write());
}
+// This test verifies how nghttp2 behaves when a connection becomes
+// write-blocked.
+TEST(NgHttp2AdapterTest, ClientSubmitRequestWithDataProviderAndWriteBlock) {
+ DataSavingVisitor visitor;
+ auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+ const absl::string_view kBody = "This is an example request body.";
+ // This test will use TestDataSource as the source of the body payload data.
+ TestDataSource body1{kBody};
+ // The TestDataSource is wrapped in the nghttp2_data_provider data type.
+ nghttp2_data_provider provider = body1.MakeDataProvider();
+ nghttp2_send_data_callback send_callback = &TestSendCallback;
+
+ // This call transforms it back into a DataFrameSource, which is compatible
+ // with the Http2Adapter API.
+ std::unique_ptr<DataFrameSource> frame_source =
+ MakeZeroCopyDataFrameSource(provider, &visitor, std::move(send_callback));
+ int stream_id =
+ adapter->SubmitRequest(ToHeaders({{":method", "POST"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}}),
+ frame_source.get(), nullptr);
+ EXPECT_GT(stream_id, 0);
+ EXPECT_TRUE(adapter->session().want_write());
+
+ visitor.set_is_write_blocked(true);
+ adapter->Send();
+ EXPECT_THAT(visitor.data(), testing::IsEmpty());
+ EXPECT_TRUE(adapter->session().want_write());
+
+ visitor.set_is_write_blocked(false);
+ adapter->Send();
+
+ // Client preface does not appear to include the mandatory SETTINGS frame.
+ absl::string_view serialized = visitor.data();
+ EXPECT_THAT(serialized,
+ testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+ serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+ EXPECT_THAT(serialized, EqualsFrames({spdy::SpdyFrameType::HEADERS,
+ spdy::SpdyFrameType::DATA}));
+ EXPECT_FALSE(adapter->session().want_write());
+}
+
TEST(NgHttp2AdapterTest, ServerConstruction) {
testing::StrictMock<MockHttp2Visitor> visitor;
auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/nghttp2_util.cc b/http2/adapter/nghttp2_util.cc
index 812bbec..f3743ca 100644
--- a/http2/adapter/nghttp2_util.cc
+++ b/http2/adapter/nghttp2_util.cc
@@ -149,10 +149,14 @@
}
}
- void Send(absl::string_view frame_header, size_t payload_length) override {
- send_data_(nullptr /* session */, nullptr /* frame */,
- ToUint8Ptr(frame_header.data()), payload_length,
- &provider_.source, user_data_);
+ bool Send(absl::string_view frame_header, size_t payload_length) override {
+ const int result =
+ send_data_(nullptr /* session */, nullptr /* frame */,
+ ToUint8Ptr(frame_header.data()), payload_length,
+ &provider_.source, user_data_);
+ QUICHE_LOG_IF(ERROR, result < 0 && result != NGHTTP2_ERR_WOULDBLOCK)
+ << "Unexpected error code from send: " << result;
+ return result == 0;
}
bool send_fin() const override { return send_fin_; }
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 8c10656..bb8405f 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -138,15 +138,20 @@
// Serialize and send frames in the queue.
while (!frames_.empty()) {
spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frames_.front());
- frames_.pop_front();
const ssize_t result = visitor_.OnReadyToSend(absl::string_view(frame));
if (result < 0) {
visitor_.OnConnectionError();
return false;
- }
- if (result < frame.size()) {
- serialized_prefix_.assign(frame.data() + result, frame.size() - result);
+ } else if (result == 0) {
+ // Write blocked.
return false;
+ } else {
+ frames_.pop_front();
+ if (result < frame.size()) {
+ // The frame was partially written, so the rest must be buffered.
+ serialized_prefix_.assign(frame.data() + result, frame.size() - result);
+ return false;
+ }
}
}
return true;
@@ -174,6 +179,7 @@
return true;
}
bool source_can_produce = true;
+ bool connection_can_write = true;
int32_t available_window =
std::min(std::min(peer_window_, state.send_window), max_frame_payload_);
while (available_window > 0 && state.outbound_body != nullptr) {
@@ -193,7 +199,12 @@
data.SetDataShallow(length);
spdy::SpdySerializedFrame header =
spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(data);
- state.outbound_body->Send(absl::string_view(header), length);
+ const bool success =
+ state.outbound_body->Send(absl::string_view(header), length);
+ if (!success) {
+ connection_can_write = false;
+ break;
+ }
peer_window_ -= length;
state.send_window -= length;
available_window =
@@ -221,7 +232,9 @@
state.outbound_body != nullptr) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
- return available_window > 0;
+ // Streams can continue writing as long as the connection is not write-blocked
+ // and there is additional flow control quota available.
+ return connection_can_write && available_window > 0;
}
int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 636fda1..4b42188 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -256,6 +256,43 @@
// read blocked.
}
+// This test exercises the case where the connection to the peer is write
+// blocked.
+TEST(OgHttp2SessionTest, ClientSubmitRequestWithWriteBlock) {
+ DataSavingVisitor visitor;
+ OgHttp2Session session(
+ visitor, OgHttp2Session::Options{.perspective = Perspective::kClient});
+ EXPECT_FALSE(session.want_write());
+
+ const char* kSentinel1 = "arbitrary pointer 1";
+ TestDataFrameSource body1(visitor, "This is an example request body.");
+ int stream_id =
+ session.SubmitRequest(ToHeaders({{":method", "POST"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}}),
+ &body1, const_cast<char*>(kSentinel1));
+ EXPECT_GT(stream_id, 0);
+ EXPECT_TRUE(session.want_write());
+ EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
+ visitor.set_is_write_blocked(true);
+ session.Send();
+
+ EXPECT_THAT(visitor.data(), testing::IsEmpty());
+ EXPECT_TRUE(session.want_write());
+ visitor.set_is_write_blocked(false);
+ session.Send();
+
+ absl::string_view serialized = visitor.data();
+ EXPECT_THAT(serialized,
+ testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+ serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+ EXPECT_THAT(serialized,
+ EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS,
+ SpdyFrameType::DATA}));
+ EXPECT_FALSE(session.want_write());
+}
+
TEST(OgHttp2SessionTest, ClientStartShutdown) {
DataSavingVisitor visitor;
OgHttp2Session session(
diff --git a/http2/adapter/test_utils.cc b/http2/adapter/test_utils.cc
index c6e98a2..e0dfec6 100644
--- a/http2/adapter/test_utils.cc
+++ b/http2/adapter/test_utils.cc
@@ -48,7 +48,7 @@
return {length, end_data};
}
-void TestDataFrameSource::Send(absl::string_view frame_header,
+bool TestDataFrameSource::Send(absl::string_view frame_header,
size_t payload_length) {
QUICHE_LOG_IF(DFATAL, payload_length > current_fragment_.size())
<< "payload_length: " << payload_length
@@ -56,13 +56,23 @@
const std::string concatenated =
absl::StrCat(frame_header, current_fragment_.substr(0, payload_length));
const ssize_t result = visitor_.OnReadyToSend(concatenated);
- if (result < concatenated.size()) {
- QUICHE_LOG(ERROR)
+ if (result < 0) {
+ // Write encountered error.
+ visitor_.OnConnectionError();
+ current_fragment_ = {};
+ payload_fragments_.clear();
+ return false;
+ } else if (result == 0) {
+ // Write blocked.
+ return false;
+ } else if (result < concatenated.size()) {
+ // Probably need to handle this better within this test class.
+ QUICHE_LOG(DFATAL)
<< "DATA frame not fully flushed. Connection will be corrupt!";
visitor_.OnConnectionError();
current_fragment_ = {};
payload_fragments_.clear();
- return;
+ return false;
}
current_fragment_.remove_prefix(payload_length);
if (current_fragment_.empty()) {
@@ -71,6 +81,7 @@
current_fragment_ = payload_fragments_.front();
}
}
+ return true;
}
namespace {
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index de001a1..1d58669 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -19,6 +19,9 @@
class DataSavingVisitor : public testing::StrictMock<MockHttp2Visitor> {
public:
ssize_t OnReadyToSend(absl::string_view data) override {
+ if (is_write_blocked_) {
+ return 0;
+ }
const size_t to_accept = std::min(send_limit_, data.size());
absl::StrAppend(&data_, data.substr(0, to_accept));
return to_accept;
@@ -29,9 +32,13 @@
void set_send_limit(size_t limit) { send_limit_ = limit; }
+ bool is_write_blocked() const { return is_write_blocked_; }
+ void set_is_write_blocked(bool value) { is_write_blocked_ = value; }
+
private:
std::string data_;
size_t send_limit_ = std::numeric_limits<size_t>::max();
+ bool is_write_blocked_ = false;
};
// A test DataFrameSource that can be initialized with a single string payload,
@@ -47,7 +54,7 @@
bool has_fin = true);
std::pair<ssize_t, bool> SelectPayloadLength(size_t max_length) override;
- void Send(absl::string_view frame_header, size_t payload_length) override;
+ bool Send(absl::string_view frame_header, size_t payload_length) override;
bool send_fin() const override { return has_fin_; }
void set_is_data_available(bool value) { is_data_available_ = value; }