Adds `Http2VisitorInterface` callbacks that report flow control exhaustion events.

* A "local" flow control exhaustion event indicates that this HTTP/2 endpoint can no longer send data because it has consumed all of the flow control quota granted by the peer.
* A "remote" flow control exhaustion event indicates that this endpoint believes the peer can no longer send data, based on the flow control quota granted and the `DATA` frames received.

Some users of the `oghttp2` library may be interested in detecting these events.

Protected by new observation hooks only; no change to protocol behavior.

PiperOrigin-RevId: 782034513
diff --git a/quiche/http2/adapter/http2_visitor_interface.h b/quiche/http2/adapter/http2_visitor_interface.h
index f00ac61..fffd59a 100644
--- a/quiche/http2/adapter/http2_visitor_interface.h
+++ b/quiche/http2/adapter/http2_visitor_interface.h
@@ -290,6 +290,17 @@
     return {-1, false};
   }
 
+  // Invoked when this endpoint is no longer able to send data to the peer
+  // because the stream or connection flow control send window has been
+  // exhausted. A `stream_id` of zero indicates the connection level flow
+  // control window.
+  virtual void OnLocalFlowControlExhausted(Http2StreamId /*stream_id*/) {}
+
+  // Invoked when the peer is no longer able to send data to this endpoint
+  // because the flow control receive window has been exhausted. A `stream_id`
+  // of zero indicates the connection level flow control window.
+  virtual void OnRemoteFlowControlExhausted(Http2StreamId /*stream_id*/) {}
+
   // Invoked with an error message from the application.
   virtual void OnErrorDebug(absl::string_view message) = 0;
 
diff --git a/quiche/http2/adapter/mock_http2_visitor.h b/quiche/http2/adapter/mock_http2_visitor.h
index 2e68cb2..78e8d7a 100644
--- a/quiche/http2/adapter/mock_http2_visitor.h
+++ b/quiche/http2/adapter/mock_http2_visitor.h
@@ -124,6 +124,12 @@
               (Http2StreamId stream_id, uint8_t* dest, size_t dest_len),
               (override));
 
+  MOCK_METHOD(void, OnLocalFlowControlExhausted, (Http2StreamId stream_id),
+              (override));
+
+  MOCK_METHOD(void, OnRemoteFlowControlExhausted, (Http2StreamId stream_id),
+              (override));
+
   MOCK_METHOD(void, OnErrorDebug, (absl::string_view message), (override));
 };
 
diff --git a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
index 3995315..9201f22 100644
--- a/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_metadata_test.cc
@@ -385,6 +385,8 @@
   // 4 DATA frames should saturate the default 64kB stream/connection flow
   // control window.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1));
 
   int result = adapter->Send();
   EXPECT_EQ(0, result);
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc
index 765473c..b8189fe 100644
--- a/quiche/http2/adapter/oghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -3187,6 +3187,8 @@
   options.perspective = Perspective::kClient;
   auto adapter = OgHttp2Adapter::Create(visitor, options);
 
+  testing::InSequence s;
+
   const std::string initial_frames =
       TestFrameSequence()
           .Settings({{INITIAL_WINDOW_SIZE, 80000u}})
@@ -3238,6 +3240,7 @@
   // The client can send more than 4 frames (65536 bytes) of data.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(4);
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14464, 0x0, 0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
 
   result = adapter->Send();
   EXPECT_EQ(0, result);
@@ -3253,6 +3256,8 @@
   options.perspective = Perspective::kClient;
   auto adapter = OgHttp2Adapter::Create(visitor, options);
 
+  testing::InSequence s;
+
   const std::string initial_frames =
       TestFrameSequence().ServerPreface().WindowUpdate(0, 65536).Serialize();
   // Server preface (empty SETTINGS)
@@ -3295,6 +3300,7 @@
   // yet been increased.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16384, 0x0, 0)).Times(3);
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 16383, 0x0, 0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
 
   result = adapter->Send();
   EXPECT_EQ(0, result);
@@ -3324,6 +3330,7 @@
   EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, ACK_FLAG, 0));
   // The client can write more after receiving the INITIAL_WINDOW_SIZE setting.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id, 14465, 0x0, 0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id));
 
   result = adapter->Send();
   EXPECT_EQ(0, result);
@@ -3964,6 +3971,8 @@
   // 4 DATA frames should saturate the default 64kB stream/connection flow
   // control window.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id1));
 
   int result = adapter->Send();
   EXPECT_EQ(0, result);
@@ -3993,8 +4002,12 @@
 
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0))
       .Times(testing::AtLeast(1));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(stream_id2));
+
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0))
       .Times(testing::AtLeast(1));
+  // Connection level flow control is exhausted.
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
 
   EXPECT_TRUE(adapter->want_write());
   result = adapter->Send();
@@ -4041,9 +4054,12 @@
   EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
   EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(1);
-  // 4 DATA frames should saturate the default 64kB stream/connection flow
+  // 4 DATA frames should saturate the default 64kB connection flow
   // control window.
   EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)).Times(4);
+  // The connection flow control window has been exhausted, but both streams
+  // still have flow control window available.
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(0));
 
   int result = adapter->Send();
   EXPECT_EQ(0, result);
@@ -5798,6 +5814,7 @@
   // This will send data but not trailers, because the data source hasn't
   // finished sending.
   EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnLocalFlowControlExhausted(1));
   send_result = adapter->Send();
   EXPECT_EQ(0, send_result);
   EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA}));
@@ -6272,6 +6289,7 @@
   EXPECT_CALL(visitor, OnBeginDataForStream(1, 16384));
   EXPECT_CALL(visitor, OnDataForStream(1, _));
   EXPECT_CALL(visitor, OnFrameHeader(1, 16384, DATA, 0x0));
+  EXPECT_CALL(visitor, OnRemoteFlowControlExhausted(1)).Times(2);
   // No further frame data or headers are delivered.
 
   result = adapter->ProcessBytes(more_frames);
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index 9b1afb2..f757a2c 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -841,11 +841,13 @@
     }
     return SendResult::SEND_OK;
   }
+  bool wrote_data = false;
   int32_t available_window =
       std::min({connection_send_window_, state.send_window,
                 static_cast<int32_t>(max_frame_payload_)});
   while (connection_can_write == SendResult::SEND_OK && available_window > 0 &&
          IsReadyToWriteData(state)) {
+    wrote_data = true;
     DataFrameHeaderInfo info =
         GetDataFrameInfo(stream_id, available_window, state);
     QUICHE_VLOG(3) << "WriteForStream | length: " << info.payload_length
@@ -925,10 +927,19 @@
   }
   // If the stream still exists and has data to send, it should be marked as
   // ready in the write scheduler.
-  if (stream_map_.contains(stream_id) && !state.data_deferred &&
-      state.send_window > 0 && HasMoreData(state)) {
+  const bool stream_exists = stream_map_.contains(stream_id);
+  if (stream_exists && !state.data_deferred && state.send_window > 0 &&
+      HasMoreData(state)) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
+  if (wrote_data) {
+    if (connection_send_window_ <= 0) {
+      visitor_.OnLocalFlowControlExhausted(0);
+    }
+    if (stream_exists && state.send_window <= 0) {
+      visitor_.OnLocalFlowControlExhausted(stream_id);
+    }
+  }
   // Streams can continue writing as long as the connection is not write-blocked
   // and there is additional flow control quota available.
   if (connection_can_write != SendResult::SEND_OK) {
@@ -1843,9 +1854,17 @@
 }
 
 void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
-  connection_window_manager_.MarkDataBuffered(bytes);
+  const bool peer_connection_window_available =
+      connection_window_manager_.MarkDataBuffered(bytes);
+  if (!peer_connection_window_available) {
+    visitor_.OnRemoteFlowControlExhausted(0);
+  }
   if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
-    it->second.window_manager.MarkDataBuffered(bytes);
+    const bool peer_stream_window_available =
+        it->second.window_manager.MarkDataBuffered(bytes);
+    if (!peer_stream_window_available) {
+      visitor_.OnRemoteFlowControlExhausted(stream_id);
+    }
   }
 }
 
diff --git a/quiche/http2/adapter/window_manager.h b/quiche/http2/adapter/window_manager.h
index ffe1a7f..c843fa7 100644
--- a/quiche/http2/adapter/window_manager.h
+++ b/quiche/http2/adapter/window_manager.h
@@ -68,11 +68,11 @@
 
   void MaybeNotifyListener();
 
-  // The upper bound on the flow control window. The GFE attempts to maintain a
-  // window of this size at the peer as data is proxied through.
+  // The upper bound on the flow control window. This endpoint attempts to
+  // maintain a window of this size at the peer as data is proxied through.
   int64_t limit_;
 
-  // The current flow control window that has not been advertised to the peer
+  // The current flow control window that has been advertised to the peer
   // and not yet consumed. The peer can send this many bytes before becoming
   // blocked.
   int64_t window_;