Adds `Http2VisitorInterface` callbacks that report flow control exhaustion events.
* A "local" flow control exhaustion event indicates that this HTTP/2 endpoint can no longer send data because it has consumed all of the flow control quota granted by the peer.
* A "remote" flow control exhaustion event indicates that this endpoint believes the peer can no longer send data, based on the flow control quota granted and the `DATA` frames received.
Some users of the `oghttp2` library may be interested in detecting these events.
Protected by new observation hooks only; no change to protocol behavior.
PiperOrigin-RevId: 782034513
diff --git a/quiche/http2/adapter/http2_visitor_interface.h b/quiche/http2/adapter/http2_visitor_interface.h
index f00ac61..fffd59a 100644
--- a/quiche/http2/adapter/http2_visitor_interface.h
+++ b/quiche/http2/adapter/http2_visitor_interface.h
@@ -290,6 +290,17 @@
return {-1, false};
}
+ // Invoked when this endpoint is no longer able to send data to the peer
+ // because the stream or connection flow control send window has been
+ // exhausted. A `stream_id` of zero indicates the connection level flow
+ // control window.
+ virtual void OnLocalFlowControlExhausted(Http2StreamId /*stream_id*/) {}
+
+ // Invoked when the peer is no longer able to send data to this endpoint
+ // because the flow control receive window has been exhausted. A `stream_id`
+ // of zero indicates the connection level flow control window.
+ virtual void OnRemoteFlowControlExhausted(Http2StreamId /*stream_id*/) {}
+
// Invoked with an error message from the application.
virtual void OnErrorDebug(absl::string_view message) = 0;
diff --git a/quiche/http2/adapter/mock_http2_visitor.h b/quiche/http2/adapter/mock_http2_visitor.h
index 2e68cb2..78e8d7a 100644
--- a/quiche/http2/adapter/mock_http2_visitor.h
+++ b/quiche/http2/adapter/mock_http2_visitor.h
@@ -124,6 +124,12 @@
(Http2StreamId stream_id, uint8_t* dest, size_t dest_len),
(override));
+ MOCK_METHOD(void, OnLocalFlowControlExhausted, (Http2StreamId stream_id),
+ (override));
+
+ MOCK_METHOD(void, OnRemoteFlowControlExhausted, (Http2StreamId stream_id),
+ (override));
+
MOCK_METHOD(void, OnErrorDebug, (absl::string_view message), (override));
};
diff --git a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
index 3995315..9201f22 100644
--- a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
@@ -385,6 +385,8 @@
// 4 DATA frames should saturate the default 64kB stream/connection flow
// control window.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1));
int result = adapter->Send();
EXPECT_EQ(0, result);
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc
index 765473c..b8189fe 100644
--- a/quiche/http2/adapter/oghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -3187,6 +3187,8 @@
options.perspective = Perspective::kClient;
auto adapter = OgHttp2Adapter::Create(visitor, options);
+ testing::InSequence s;
+
const std::string initial_frames =
TestFrameSequence()
.Settings({{INITIAL_WINDOW_SIZE, 80000u}})
@@ -3238,6 +3240,7 @@
// The client can send more than 4 frames (65536 bytes) of data.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(4);
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14464, 0x0, 0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
result = adapter->Send();
EXPECT_EQ(0, result);
@@ -3253,6 +3256,8 @@
options.perspective = Perspective::kClient;
auto adapter = OgHttp2Adapter::Create(visitor, options);
+ testing::InSequence s;
+
const std::string initial_frames =
TestFrameSequence().ServerPreface().WindowUpdate(0, 65536).Serialize();
// Server preface (empty SETTINGS)
@@ -3295,6 +3300,7 @@
// yet been increased.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(3);
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16383, 0x0, 0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
result = adapter->Send();
EXPECT_EQ(0, result);
@@ -3324,6 +3330,7 @@
EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, ACK_FLAG, 0));
// The client can write more after receiving the INITIAL_WINDOW_SIZE setting.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14465, 0x0, 0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
result = adapter->Send();
EXPECT_EQ(0, result);
@@ -3964,6 +3971,8 @@
// 4 DATA frames should saturate the default 64kB stream/connection flow
// control window.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1));
int result = adapter->Send();
EXPECT_EQ(0, result);
@@ -3993,8 +4002,12 @@
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0))
.Times(testing::AtLeast(1));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id2));
+
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0))
.Times(testing::AtLeast(1));
+ // Connection level flow control is exhausted.
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
EXPECT_TRUE(adapter->want_write());
result = adapter->Send();
@@ -4041,9 +4054,12 @@
EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(1);
- // 4 DATA frames should saturate the default 64kB stream/connection flow
+ // 4 DATA frames should saturate the default 64kB connection flow
// control window.
EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)).Times(4);
+ // The connection flow control window has been exhausted, but both streams
+ // still have flow control window available.
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
int result = adapter->Send();
EXPECT_EQ(0, result);
@@ -5798,6 +5814,7 @@
// This will send data but not trailers, because the data source hasn't
// finished sending.
EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+ EXPECT_CALL(visitor, OnLocalFlowControlExhausted(1));
send_result = adapter->Send();
EXPECT_EQ(0, send_result);
EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA}));
@@ -6272,6 +6289,7 @@
EXPECT_CALL(visitor, OnBeginDataForStream(1, 16384));
EXPECT_CALL(visitor, OnDataForStream(1, _));
EXPECT_CALL(visitor, OnFrameHeader(1, 16384, DATA, 0x0));
+ EXPECT_CALL(visitor, OnRemoteFlowControlExhausted(1)).Times(2);
// No further frame data or headers are delivered.
result = adapter->ProcessBytes(more_frames);
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index 9b1afb2..f757a2c 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -841,11 +841,13 @@
}
return SendResult::SEND_OK;
}
+ bool wrote_data = false;
int32_t available_window =
std::min({connection_send_window_, state.send_window,
static_cast<int32_t>(max_frame_payload_)});
while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
IsReadyToWriteData(state)) {
+ wrote_data = true;
DataFrameHeaderInfo info =
GetDataFrameInfo(stream_id, available_window, state);
QUICHE_VLOG(3) << "WriteForStream | length: " << info.payload_length
@@ -925,10 +927,19 @@
}
// If the stream still exists and has data to send, it should be marked as
// ready in the write scheduler.
- if (stream_map_.contains(stream_id) && !state.data_deferred &&
- state.send_window > 0 && HasMoreData(state)) {
+ const bool stream_exists = stream_map_.contains(stream_id);
+ if (stream_exists && !state.data_deferred && state.send_window > 0 &&
+ HasMoreData(state)) {
write_scheduler_.MarkStreamReady(stream_id, false);
}
+ if (wrote_data) {
+ if (connection_send_window_ <= 0) {
+ visitor_.OnLocalFlowControlExhausted(0);
+ }
+ if (stream_exists && state.send_window <= 0) {
+ visitor_.OnLocalFlowControlExhausted(stream_id);
+ }
+ }
// Streams can continue writing as long as the connection is not write-blocked
// and there is additional flow control quota available.
if (connection_can_write != SendResult::SEND_OK) {
@@ -1843,9 +1854,17 @@
}
void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
- connection_window_manager_.MarkDataBuffered(bytes);
+ const bool peer_connection_window_available =
+ connection_window_manager_.MarkDataBuffered(bytes);
+ if (!peer_connection_window_available) {
+ visitor_.OnRemoteFlowControlExhausted(0);
+ }
if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
- it->second.window_manager.MarkDataBuffered(bytes);
+ const bool peer_stream_window_available =
+ it->second.window_manager.MarkDataBuffered(bytes);
+ if (!peer_stream_window_available) {
+ visitor_.OnRemoteFlowControlExhausted(stream_id);
+ }
}
}
diff --git a/quiche/http2/adapter/window_manager.h b/quiche/http2/adapter/window_manager.h
index ffe1a7f..c843fa7 100644
--- a/quiche/http2/adapter/window_manager.h
+++ b/quiche/http2/adapter/window_manager.h
@@ -68,11 +68,11 @@
void MaybeNotifyListener();
- // The upper bound on the flow control window. The GFE attempts to maintain a
- // window of this size at the peer as data is proxied through.
+ // The upper bound on the flow control window. This endpoint attempts to
+ // maintain a window of this size at the peer as data is proxied through.
int64_t limit_;
- // The current flow control window that has not been advertised to the peer
+ // The current flow control window that has been advertised to the peer
// and not yet consumed. The peer can send this many bytes before becoming
// blocked.
int64_t window_;