Tracks streams with ready metadata or trailers separately from write_scheduler_.
PiperOrigin-RevId: 415351912
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 6235d34..3b22a92 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1756,6 +1756,196 @@
EXPECT_FALSE(adapter->want_write());
}
+TEST(OgHttp2AdapterClientTest, ClientEncountersFlowControlBlock) {
+ DataSavingVisitor visitor;
+ OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+ auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+ testing::InSequence s;
+
+ const std::vector<const Header> headers1 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}});
+
+ const std::string kBody = std::string(100 * 1024, 'a');
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body1->AppendPayload(kBody);
+ body1->EndData();
+
+ const int32_t stream_id1 =
+ adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+ ASSERT_GT(stream_id1, 0);
+
+ const std::vector<const Header> headers2 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/two"}});
+
+ auto body2 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body2->AppendPayload(kBody);
+ body2->EndData();
+
+ const int32_t stream_id2 =
+ adapter->SubmitRequest(headers2, std::move(body2), nullptr);
+ ASSERT_GT(stream_id2, 0);
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
+ // 4 DATA frames should saturate the default 64kB stream/connection flow
+ // control window.
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+
+ int result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+ const std::string stream_frames = TestFrameSequence()
+ .ServerPreface()
+ .WindowUpdate(0, 80000)
+ .WindowUpdate(stream_id1, 20000)
+ .Serialize();
+
+ // Server preface (empty SETTINGS)
+ EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(0, 80000));
+ EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+ EXPECT_CALL(visitor, OnWindowUpdate(1, 20000));
+
+ const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+ EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0))
+ .Times(testing::AtLeast(1));
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0))
+ .Times(testing::AtLeast(1));
+
+ EXPECT_TRUE(adapter->want_write());
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+}
+
+TEST(OgHttp2AdapterClientTest, ClientSendsTrailersAfterFlowControlBlock) {
+ DataSavingVisitor visitor;
+ OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+ auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+ testing::InSequence s;
+
+ const std::vector<const Header> headers1 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}});
+
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body1->AppendPayload("Really small body.");
+ body1->EndData();
+
+ const int32_t stream_id1 =
+ adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+ ASSERT_GT(stream_id1, 0);
+
+ const std::vector<const Header> headers2 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/two"}});
+
+ const std::string kBody = std::string(100 * 1024, 'a');
+ auto body2 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body2->AppendPayload(kBody);
+ body2->EndData();
+
+ const int32_t stream_id2 =
+ adapter->SubmitRequest(headers2, std::move(body2), nullptr);
+ ASSERT_GT(stream_id2, 0);
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(1);
+ // 4 DATA frames should saturate the default 64kB stream/connection flow
+ // control window.
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)).Times(4);
+
+ int result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_FALSE(adapter->want_write());
+ EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+ const std::vector<const Header> trailers1 =
+ ToHeaders({{"extra-info", "Trailers are weird but good?"}});
+ adapter->SubmitTrailer(stream_id1, trailers1);
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+}
+
+TEST(OgHttp2AdapterClientTest, ClientSendsMetadataAfterFlowControlBlock) {
+ DataSavingVisitor visitor;
+ OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+ auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+ testing::InSequence s;
+
+ const std::vector<const Header> headers1 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}});
+
+ const std::string kBody = std::string(100 * 1024, 'a');
+ auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+ body1->AppendPayload(kBody);
+ body1->EndData();
+
+ const int32_t stream_id1 =
+ adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+ ASSERT_GT(stream_id1, 0);
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+ // 4 DATA frames should saturate the default 64kB stream/connection flow
+ // control window.
+ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+
+ int result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_FALSE(adapter->want_write());
+ EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+ auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+ {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+ adapter->SubmitMetadata(1, 16384u, std::move(source));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+ EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+}
+
TEST_F(OgHttp2AdapterTest, SubmitMetadata) {
auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
{{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 82d9d87..aaf97d7 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -482,9 +482,8 @@
continue_writing = SendMetadata(0, connection_metadata_);
}
// Wake streams for writes.
- while (continue_writing == SendResult::SEND_OK &&
- write_scheduler_.HasReadyStreams() && connection_send_window_ > 0) {
- const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
+ while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
+ const Http2StreamId stream_id = GetNextReadyStream();
// TODO(birenroy): Add a return value to indicate write blockage, so streams
// aren't woken unnecessarily.
QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
@@ -496,6 +495,28 @@
return continue_writing == SendResult::SEND_ERROR ? -1 : 0;
}
+bool OgHttp2Session::HasReadyStream() const {
+ return !metadata_ready_.empty() || !trailers_ready_.empty() ||
+ (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
+}
+
+Http2StreamId OgHttp2Session::GetNextReadyStream() {
+ QUICHE_DCHECK(HasReadyStream());
+ if (!metadata_ready_.empty()) {
+ const Http2StreamId stream_id = *metadata_ready_.begin();
+ // WriteForStream() will re-mark the stream as ready, if necessary.
+ write_scheduler_.MarkStreamNotReady(stream_id);
+ return stream_id;
+ }
+ if (!trailers_ready_.empty()) {
+ const Http2StreamId stream_id = *trailers_ready_.begin();
+ // WriteForStream() will re-mark the stream as ready, if necessary.
+ write_scheduler_.MarkStreamNotReady(stream_id);
+ return stream_id;
+ }
+ return write_scheduler_.PopNextReadyStream();
+}
+
OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
int64_t result = std::numeric_limits<int64_t>::max();
while (result > 0 && !buffered_data_.empty()) {
@@ -692,7 +713,8 @@
if (connection_can_write != SendResult::SEND_OK) {
return connection_can_write;
}
- return available_window <= 0 ? SendResult::SEND_BLOCKED : SendResult::SEND_OK;
+ return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
+ : SendResult::SEND_OK;
}
OgHttp2Session::SendResult OgHttp2Session::SendMetadata(
@@ -719,6 +741,7 @@
std::string(payload)));
if (end_metadata) {
sequence.erase(sequence.begin());
+ metadata_ready_.erase(stream_id);
}
}
return SendQueuedFrames();
@@ -789,7 +812,7 @@
state.trailers =
absl::make_unique<spdy::SpdyHeaderBlock>(ToHeaderBlock(trailers));
if (!options_.trailers_require_end_data || !iter->second.data_deferred) {
- write_scheduler_.MarkStreamReady(stream_id, false);
+ trailers_ready_.insert(stream_id);
}
}
return 0;
@@ -802,7 +825,7 @@
} else {
auto iter = CreateStream(stream_id);
iter->second.outbound_metadata.push_back(std::move(source));
- write_scheduler_.MarkStreamReady(stream_id, false);
+ metadata_ready_.insert(stream_id);
}
}
@@ -1089,6 +1112,7 @@
} else {
if (it->second.send_window == 0) {
// The stream was blocked on flow control.
+ QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
write_scheduler_.MarkStreamReady(stream_id, false);
}
it->second.send_window += delta_window_size;
@@ -1277,6 +1301,7 @@
absl::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
frame->set_fin(true);
EnqueueFrame(std::move(frame));
+ trailers_ready_.erase(stream_id);
}
void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
@@ -1341,6 +1366,8 @@
Http2ErrorCode error_code) {
visitor_.OnCloseStream(stream_id, error_code);
stream_map_.erase(stream_id);
+ trailers_ready_.erase(stream_id);
+ metadata_ready_.erase(stream_id);
if (write_scheduler_.StreamRegistered(stream_id)) {
write_scheduler_.UnregisterStream(stream_id);
}
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index fd7bdc2..e4a2374 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -125,7 +125,7 @@
}
bool want_write() const override {
return !frames_.empty() || !buffered_data_.empty() ||
- write_scheduler_.HasReadyStreams() || !connection_metadata_.empty();
+ !connection_metadata_.empty() || HasReadyStream();
}
int GetRemoteWindowSize() const override { return connection_send_window_; }
@@ -285,6 +285,13 @@
SEND_ERROR,
};
+ // Returns true if at least one stream has data or control frames to write.
+ bool HasReadyStream() const;
+
+ // Returns the next stream that has something to write. If there are no such
+ // streams, returns zero.
+ Http2StreamId GetNextReadyStream();
+
// Sends the buffered connection preface or serialized frame data, if any.
SendResult MaybeSendBufferedData();
@@ -391,7 +398,13 @@
WindowManager connection_window_manager_;
absl::flat_hash_set<Http2StreamId> streams_reset_;
+
+ // The number of frames currently queued per stream.
absl::flat_hash_map<Http2StreamId, int> queued_frames_;
+ // Includes streams that are currently ready to write trailers.
+ absl::flat_hash_set<Http2StreamId> trailers_ready_;
+ // Includes streams that are currently ready to write metadata.
+ absl::flat_hash_set<Http2StreamId> metadata_ready_;
MetadataSequence connection_metadata_;