Adds a StreamState member that tracks whether the data source is currently blocked.
This affects whether the stream should be scheduled for writes, and therefore the return value of OgHttp2Adapter::want_write().
PiperOrigin-RevId: 401817911
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index ccfb1ac..62f3682 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -1520,6 +1520,117 @@
spdy::SpdyFrameType::PING}));
}
+// Tests the case where the response body is in the progress of being sent while
+// trailers are queued.
+TEST(NgHttp2AdapterTest, ServerSubmitsTrailersWhileDataDeferred) {
+ DataSavingVisitor visitor;
+ auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+ const std::string frames = TestFrameSequence()
+ .ClientPreface()
+ .Headers(1,
+ {{":method", "POST"},
+ {":scheme", "https"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}},
+ /*fin=*/false)
+ .WindowUpdate(1, 2000)
+ .Data(1, "This is the request body.")
+ .WindowUpdate(0, 2000)
+ .Serialize();
+ testing::InSequence s;
+
+ // Client preface (empty SETTINGS)
+ EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+ EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/this/is/request/one"));
+ EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+ EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
+ EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
+ EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+ EXPECT_CALL(visitor, OnDataForStream(1, "This is the request body."));
+ EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+
+ const int64_t result = adapter->ProcessBytes(frames);
+ EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+ int send_result = adapter->Send();
+ // Some bytes should have been serialized.
+ EXPECT_EQ(0, send_result);
+ // SETTINGS ack
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
+ visitor.Clear();
+
+ const absl::string_view kBody = "This is an example response body.";
+
+ // The body source must indicate that the end of the body is not the end of
+ // the stream.
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body1->AppendPayload(kBody);
+ auto* body1_ptr = body1.get();
+ int submit_result = adapter->SubmitResponse(
+ 1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
+ std::move(body1));
+ EXPECT_EQ(submit_result, 0);
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+ EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+
+ send_result = adapter->Send();
+ // Some bytes should have been serialized.
+ EXPECT_EQ(0, send_result);
+ // SETTINGS ack
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
+ spdy::SpdyFrameType::DATA}));
+ visitor.Clear();
+ EXPECT_FALSE(adapter->want_write());
+
+ int trailer_result =
+ adapter->SubmitTrailer(1, ToHeaders({{"final-status", "a-ok"}}));
+ ASSERT_EQ(trailer_result, 0);
+
+ // Even though the data source has not finished sending data, nghttp2 will
+ // write the trailers anyway.
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x5));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x5, 0));
+
+ send_result = adapter->Send();
+ EXPECT_EQ(0, send_result);
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
+ visitor.Clear();
+
+ // Resuming the stream results in the library wanting to write again.
+ body1_ptr->AppendPayload(kBody);
+ body1_ptr->EndData();
+ adapter->ResumeStream(1);
+ EXPECT_TRUE(adapter->want_write());
+
+ send_result = adapter->Send();
+ EXPECT_EQ(0, send_result);
+
+ // But no data is written for the stream.
+ EXPECT_THAT(visitor.data(), testing::IsEmpty());
+ EXPECT_FALSE(adapter->want_write());
+}
+
TEST(NgHttp2AdapterTest, ServerErrorWhileHandlingHeaders) {
DataSavingVisitor visitor;
auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 413db77..4731e48 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1165,6 +1165,112 @@
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
}
+// Tests the case where the response body is in the progress of being sent while
+// trailers are queued.
+TEST(OgHttp2AdapterServerTest, ServerSubmitsTrailersWhileDataDeferred) {
+ DataSavingVisitor visitor;
+ OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+ auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+ const std::string frames = TestFrameSequence()
+ .ClientPreface()
+ .Headers(1,
+ {{":method", "POST"},
+ {":scheme", "https"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}},
+ /*fin=*/false)
+ .WindowUpdate(1, 2000)
+ .Data(1, "This is the request body.")
+ .WindowUpdate(0, 2000)
+ .Serialize();
+ testing::InSequence s;
+
+ // Client preface (empty SETTINGS)
+ EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+ EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+ EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/this/is/request/one"));
+ EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+ EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
+ EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
+ EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+ EXPECT_CALL(visitor, OnDataForStream(1, "This is the request body."));
+ EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+
+ const int64_t result = adapter->ProcessBytes(frames);
+ EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+ int send_result = adapter->Send();
+ // Some bytes should have been serialized.
+ EXPECT_EQ(0, send_result);
+ // SETTINGS ack
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+ spdy::SpdyFrameType::SETTINGS}));
+ visitor.Clear();
+
+ const absl::string_view kBody = "This is an example response body.";
+
+ // The body source must indicate that the end of the body is not the end of
+ // the stream.
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body1->AppendPayload(kBody);
+ auto* body1_ptr = body1.get();
+ int submit_result = adapter->SubmitResponse(
+ 1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
+ std::move(body1));
+ EXPECT_EQ(submit_result, 0);
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+ EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+
+ send_result = adapter->Send();
+ // Some bytes should have been serialized.
+ EXPECT_EQ(0, send_result);
+ // SETTINGS ack
+ EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
+ spdy::SpdyFrameType::DATA}));
+ visitor.Clear();
+ EXPECT_FALSE(adapter->want_write());
+
+ int trailer_result =
+ adapter->SubmitTrailer(1, ToHeaders({{"final-status", "a-ok"}}));
+ ASSERT_EQ(trailer_result, 0);
+ // Even though there are new trailers to write, the data source has not
+ // finished writing data and is blocked.
+ EXPECT_FALSE(adapter->want_write());
+
+ body1_ptr->EndData();
+ adapter->ResumeStream(1);
+ EXPECT_TRUE(adapter->want_write());
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x5));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x5, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(RST_STREAM, 1, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
+
+ send_result = adapter->Send();
+ EXPECT_EQ(0, send_result);
+}
+
TEST(OgHttp2AdapterServerTest, ServerErrorWhileHandlingHeaders) {
DataSavingVisitor visitor;
OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index e3a696b..09a3ce0 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -216,10 +216,11 @@
bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
auto it = stream_map_.find(stream_id);
- if (it->second.outbound_body == nullptr ||
+ if (it == stream_map_.end() || it->second.outbound_body == nullptr ||
!write_scheduler_.StreamRegistered(stream_id)) {
return false;
}
+ it->second.data_deferred = false;
write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
return true;
}
@@ -432,7 +433,6 @@
}
return true;
}
- bool source_can_produce = true;
int32_t available_window =
std::min({connection_send_window_, state.send_window,
static_cast<int32_t>(max_frame_payload_)});
@@ -443,10 +443,9 @@
std::tie(length, end_data) =
state.outbound_body->SelectPayloadLength(available_window);
if (length == 0 && !end_data) {
- source_can_produce = false;
+ state.data_deferred = true;
break;
} else if (length == DataFrameSource::kError) {
- source_can_produce = false;
CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
// No more work on the stream; it has been closed.
break;
@@ -494,7 +493,7 @@
}
// If the stream still exists and has data to send, it should be marked as
// ready in the write scheduler.
- if (stream_map_.contains(stream_id) && source_can_produce &&
+ if (stream_map_.contains(stream_id) && !state.data_deferred &&
state.send_window > 0 && state.outbound_body != nullptr) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
@@ -601,7 +600,9 @@
// Save trailers so they can be written once data is done.
state.trailers =
absl::make_unique<spdy::SpdyHeaderBlock>(ToHeaderBlock(trailers));
- write_scheduler_.MarkStreamReady(stream_id, false);
+ if (!iter->second.data_deferred) {
+ write_scheduler_.MarkStreamReady(stream_id, false);
+ }
}
return 0;
}
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 75f9c96..814b843 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -185,6 +185,8 @@
absl::optional<HeaderType> received_header_type;
bool half_closed_local = false;
bool half_closed_remote = false;
+ // Indicates that `outbound_body` temporarily cannot produce data.
+ bool data_deferred = false;
};
using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>;