Adds `Http2VisitorInterface` callbacks that report flow control exhaustion events. * A "local" flow control exhaustion event indicates that this HTTP/2 endpoint can no longer send data because it has consumed all of the flow control quota granted by the peer. * A "remote" flow control exhaustion event indicates that this endpoint believes the peer can no longer send data, based on the flow control quota granted and the `DATA` frames received. Some users of the `oghttp2` library may be interested in detecting these events. Protected by new observation hooks only; no change to protocol behavior. PiperOrigin-RevId: 782034513
diff --git a/quiche/http2/adapter/http2_visitor_interface.h b/quiche/http2/adapter/http2_visitor_interface.h index f00ac61..fffd59a 100644 --- a/quiche/http2/adapter/http2_visitor_interface.h +++ b/quiche/http2/adapter/http2_visitor_interface.h
@@ -290,6 +290,17 @@ return {-1, false}; } + // Invoked when this endpoint is no longer able to send data to the peer + // because the stream or connection flow control send window has been + // exhausted. A `stream_id` of zero indicates the connection level flow + // control window. + virtual void OnLocalFlowControlExhausted(Http2StreamId /*stream_id*/) {} + + // Invoked when the peer is no longer able to send data to this endpoint + // because the flow control receive window has been exhausted. A `stream_id` + // of zero indicates the connection level flow control window. + virtual void OnRemoteFlowControlExhausted(Http2StreamId /*stream_id*/) {} + // Invoked with an error message from the application. virtual void OnErrorDebug(absl::string_view message) = 0;
diff --git a/quiche/http2/adapter/mock_http2_visitor.h b/quiche/http2/adapter/mock_http2_visitor.h index 2e68cb2..78e8d7a 100644 --- a/quiche/http2/adapter/mock_http2_visitor.h +++ b/quiche/http2/adapter/mock_http2_visitor.h
@@ -124,6 +124,12 @@ (Http2StreamId stream_id, uint8_t* dest, size_t dest_len), (override)); + MOCK_METHOD(void, OnLocalFlowControlExhausted, (Http2StreamId stream_id), + (override)); + + MOCK_METHOD(void, OnRemoteFlowControlExhausted, (Http2StreamId stream_id), + (override)); + MOCK_METHOD(void, OnErrorDebug, (absl::string_view message), (override)); };
diff --git a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc index 3995315..9201f22 100644 --- a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc +++ b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
@@ -385,6 +385,8 @@ // 4 DATA frames should saturate the default 64kB stream/connection flow // control window. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1)); int result = adapter->Send(); EXPECT_EQ(0, result);
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc index 765473c..b8189fe 100644 --- a/quiche/http2/adapter/oghttp2_adapter_test.cc +++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -3187,6 +3187,8 @@ options.perspective = Perspective::kClient; auto adapter = OgHttp2Adapter::Create(visitor, options); + testing::InSequence s; + const std::string initial_frames = TestFrameSequence() .Settings({{INITIAL_WINDOW_SIZE, 80000u}}) @@ -3238,6 +3240,7 @@ // The client can send more than 4 frames (65536 bytes) of data. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(4); EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14464, 0x0, 0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id)); result = adapter->Send(); EXPECT_EQ(0, result); @@ -3253,6 +3256,8 @@ options.perspective = Perspective::kClient; auto adapter = OgHttp2Adapter::Create(visitor, options); + testing::InSequence s; + const std::string initial_frames = TestFrameSequence().ServerPreface().WindowUpdate(0, 65536).Serialize(); // Server preface (empty SETTINGS) @@ -3295,6 +3300,7 @@ // yet been increased. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(3); EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16383, 0x0, 0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id)); result = adapter->Send(); EXPECT_EQ(0, result); @@ -3324,6 +3330,7 @@ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, ACK_FLAG, 0)); // The client can write more after receiving the INITIAL_WINDOW_SIZE setting. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14465, 0x0, 0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id)); result = adapter->Send(); EXPECT_EQ(0, result); @@ -3964,6 +3971,8 @@ // 4 DATA frames should saturate the default 64kB stream/connection flow // control window. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1)); int result = adapter->Send(); EXPECT_EQ(0, result); @@ -3993,8 +4002,12 @@ EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)) .Times(testing::AtLeast(1)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id2)); + EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)) .Times(testing::AtLeast(1)); + // Connection level flow control is exhausted. + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0)); EXPECT_TRUE(adapter->want_write()); result = adapter->Send(); @@ -4041,9 +4054,12 @@ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4)); EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0)); EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(1); - // 4 DATA frames should saturate the default 64kB stream/connection flow + // 4 DATA frames should saturate the default 64kB connection flow // control window. EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)).Times(4); + // The connection flow control window has been exhausted, but both streams + // still have flow control window available. + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0)); int result = adapter->Send(); EXPECT_EQ(0, result); @@ -5798,6 +5814,7 @@ // This will send data but not trailers, because the data source hasn't // finished sending. EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0)); + EXPECT_CALL(visitor, OnLocalFlowControlExhausted(1)); send_result = adapter->Send(); EXPECT_EQ(0, send_result); EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA})); @@ -6272,6 +6289,7 @@ EXPECT_CALL(visitor, OnBeginDataForStream(1, 16384)); EXPECT_CALL(visitor, OnDataForStream(1, _)); EXPECT_CALL(visitor, OnFrameHeader(1, 16384, DATA, 0x0)); + EXPECT_CALL(visitor, OnRemoteFlowControlExhausted(1)).Times(2); // No further frame data or headers are delivered. result = adapter->ProcessBytes(more_frames);
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc index 9b1afb2..f757a2c 100644 --- a/quiche/http2/adapter/oghttp2_session.cc +++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -841,11 +841,13 @@ } return SendResult::SEND_OK; } + bool wrote_data = false; int32_t available_window = std::min({connection_send_window_, state.send_window, static_cast<int32_t>(max_frame_payload_)}); while (connection_can_write == SendResult::SEND_OK && available_window > 0 && IsReadyToWriteData(state)) { + wrote_data = true; DataFrameHeaderInfo info = GetDataFrameInfo(stream_id, available_window, state); QUICHE_VLOG(3) << "WriteForStream | length: " << info.payload_length @@ -925,10 +927,19 @@ } // If the stream still exists and has data to send, it should be marked as // ready in the write scheduler. - if (stream_map_.contains(stream_id) && !state.data_deferred && - state.send_window > 0 && HasMoreData(state)) { + const bool stream_exists = stream_map_.contains(stream_id); + if (stream_exists && !state.data_deferred && state.send_window > 0 && + HasMoreData(state)) { write_scheduler_.MarkStreamReady(stream_id, false); } + if (wrote_data) { + if (connection_send_window_ <= 0) { + visitor_.OnLocalFlowControlExhausted(0); + } + if (stream_exists && state.send_window <= 0) { + visitor_.OnLocalFlowControlExhausted(stream_id); + } + } // Streams can continue writing as long as the connection is not write-blocked // and there is additional flow control quota available. if (connection_can_write != SendResult::SEND_OK) { @@ -1843,9 +1854,17 @@ } void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) { - connection_window_manager_.MarkDataBuffered(bytes); + const bool peer_connection_window_available = + connection_window_manager_.MarkDataBuffered(bytes); + if (!peer_connection_window_available) { + visitor_.OnRemoteFlowControlExhausted(0); + } if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) { - it->second.window_manager.MarkDataBuffered(bytes); + const bool peer_stream_window_available = + it->second.window_manager.MarkDataBuffered(bytes); + if (!peer_stream_window_available) { + visitor_.OnRemoteFlowControlExhausted(stream_id); + } } }
diff --git a/quiche/http2/adapter/window_manager.h b/quiche/http2/adapter/window_manager.h index ffe1a7f..c843fa7 100644 --- a/quiche/http2/adapter/window_manager.h +++ b/quiche/http2/adapter/window_manager.h
@@ -68,11 +68,11 @@ void MaybeNotifyListener(); - // The upper bound on the flow control window. The GFE attempts to maintain a - // window of this size at the peer as data is proxied through. + // The upper bound on the flow control window. This endpoint attempts to + // maintain a window of this size at the peer as data is proxied through. int64_t limit_; - // The current flow control window that has not been advertised to the peer + // The current flow control window that has been advertised to the peer // and not yet consumed. The peer can send this many bytes before becoming // blocked. int64_t window_;