Adds methods to retrieve the receive window size for the connection or a particular stream.

Also renames GetPeerConnectionWindow() to GetSendWindowSize(), and implements proper receive-side flow control accounting in OgHttp2Session.

PiperOrigin-RevId: 378752864
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index 2dd7d3f..e53918c 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -69,8 +69,16 @@
   // Invokes the visitor's OnReadyToSend() method for serialized frame data.
   virtual void Send() = 0;
 
-  // Returns the connection-level flow control window for the peer.
-  virtual int GetPeerConnectionWindow() const = 0;
+  // Returns the connection-level flow control window advertised by the peer.
+  virtual int GetSendWindowSize() const = 0;
+
+  // Returns the amount of data a peer could send on a given stream. This is
+  // the outstanding stream receive window.
+  virtual int GetStreamReceiveWindowSize(Http2StreamId stream_id) const = 0;
+
+  // Returns the total amount of data a peer could send on the connection. This
+  // is the outstanding connection receive window.
+  virtual int GetReceiveWindowSize() const = 0;
 
   // Gets the highest stream ID value seen in a frame received by this endpoint.
   // This method is only guaranteed to work for server endpoints.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 6ec2a14..d4e3365 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -102,10 +102,19 @@
   }
 }
 
-int NgHttp2Adapter::GetPeerConnectionWindow() const {
+int NgHttp2Adapter::GetSendWindowSize() const {
   return session_->GetRemoteWindowSize();
 }
 
+int NgHttp2Adapter::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+  return nghttp2_session_get_stream_local_window_size(session_->raw_ptr(),
+                                                      stream_id);
+}
+
+int NgHttp2Adapter::GetReceiveWindowSize() const {
+  return nghttp2_session_get_local_window_size(session_->raw_ptr());
+}
+
 Http2StreamId NgHttp2Adapter::GetHighestReceivedStreamId() const {
   return nghttp2_session_get_last_proc_stream_id(session_->raw_ptr());
 }
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index a3f32df..2fdf3ee 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -49,7 +49,9 @@
 
   void Send() override;
 
-  int GetPeerConnectionWindow() const override;
+  int GetSendWindowSize() const override;
+  int GetStreamReceiveWindowSize(Http2StreamId stream_id) const override;
+  int GetReceiveWindowSize() const override;
 
   Http2StreamId GetHighestReceivedStreamId() const override;
 
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 38f2301..df5d186 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -84,8 +84,7 @@
   const ssize_t initial_result = adapter->ProcessBytes(initial_frames);
   EXPECT_EQ(initial_frames.size(), initial_result);
 
-  EXPECT_EQ(adapter->GetPeerConnectionWindow(),
-            kInitialFlowControlWindowSize + 1000);
+  EXPECT_EQ(adapter->GetSendWindowSize(), kInitialFlowControlWindowSize + 1000);
   // Some bytes should have been serialized.
   adapter->Send();
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
@@ -143,6 +142,18 @@
                                             spdy::SpdyFrameType::HEADERS}));
   visitor.Clear();
 
+  // All streams are active and have not yet received any data, so the receive
+  // window should be at the initial value.
+  EXPECT_EQ(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id1));
+  EXPECT_EQ(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id2));
+  EXPECT_EQ(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id3));
+
+  // Connection has not yet received any data.
+  EXPECT_EQ(kInitialFlowControlWindowSize, adapter->GetReceiveWindowSize());
+
   EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
 
   EXPECT_EQ(kSentinel1, adapter->GetStreamUserData(stream_id1));
@@ -180,6 +191,19 @@
   const ssize_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), stream_result);
 
+  // First stream has received some data.
+  EXPECT_GT(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id1));
+  // Second stream was closed.
+  EXPECT_EQ(-1, adapter->GetStreamReceiveWindowSize(stream_id2));
+  // Third stream has not received any data.
+  EXPECT_EQ(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id3));
+
+  // Connection window should be the same as the first stream.
+  EXPECT_EQ(adapter->GetReceiveWindowSize(),
+            adapter->GetStreamReceiveWindowSize(stream_id1));
+
   // Should be 3, but this method only works for server adapters.
   EXPECT_EQ(0, adapter->GetHighestReceivedStreamId());
 
@@ -250,6 +274,11 @@
   EXPECT_GT(stream_id, 0);
   EXPECT_TRUE(adapter->session().want_write());
   adapter->Send();
+
+  EXPECT_EQ(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(stream_id));
+  EXPECT_EQ(kInitialFlowControlWindowSize, adapter->GetReceiveWindowSize());
+
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
                                             spdy::SpdyFrameType::DATA}));
   EXPECT_THAT(visitor.data(), testing::HasSubstr(kBody));
@@ -489,6 +518,11 @@
 
   EXPECT_EQ(kSentinel1, adapter->GetStreamUserData(1));
 
+  EXPECT_GT(kInitialFlowControlWindowSize,
+            adapter->GetStreamReceiveWindowSize(1));
+  EXPECT_EQ(adapter->GetStreamReceiveWindowSize(1),
+            adapter->GetReceiveWindowSize());
+
   // Because stream 3 has already been closed, it's not possible to set user
   // data.
   const char* kSentinel3 = "another arbitrary pointer";
@@ -497,8 +531,7 @@
 
   EXPECT_EQ(3, adapter->GetHighestReceivedStreamId());
 
-  EXPECT_EQ(adapter->GetPeerConnectionWindow(),
-            kInitialFlowControlWindowSize + 1000);
+  EXPECT_EQ(adapter->GetSendWindowSize(), kInitialFlowControlWindowSize + 1000);
 
   EXPECT_TRUE(adapter->session().want_write());
   // Some bytes should have been serialized.
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 9f57aec..11aefb2 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -83,10 +83,18 @@
   session_->Send();
 }
 
-int OgHttp2Adapter::GetPeerConnectionWindow() const {
+int OgHttp2Adapter::GetSendWindowSize() const {
   return session_->GetRemoteWindowSize();
 }
 
+int OgHttp2Adapter::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+  return session_->GetStreamReceiveWindowSize(stream_id);
+}
+
+int OgHttp2Adapter::GetReceiveWindowSize() const {
+  return session_->GetReceiveWindowSize();
+}
+
 Http2StreamId OgHttp2Adapter::GetHighestReceivedStreamId() const {
   return session_->GetHighestReceivedStreamId();
 }
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index bbdd2a4..477a6e0 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -35,7 +35,9 @@
                           int window_increment) override;
   void SubmitMetadata(Http2StreamId stream_id, bool fin) override;
   void Send() override;
-  int GetPeerConnectionWindow() const override;
+  int GetSendWindowSize() const override;
+  int GetStreamReceiveWindowSize(Http2StreamId stream_id) const override;
+  int GetReceiveWindowSize() const override;
   Http2StreamId GetHighestReceivedStreamId() const override;
   void MarkDataConsumedForStream(Http2StreamId stream_id,
                                  size_t num_bytes) override;
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index bee8640..1675bda 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -43,9 +43,9 @@
   EXPECT_QUICHE_BUG(adapter_->SubmitMetadata(3, true), "Not implemented");
 }
 
-TEST_F(OgHttp2AdapterTest, GetPeerConnectionWindow) {
-  const int peer_window = adapter_->GetPeerConnectionWindow();
-  EXPECT_GT(peer_window, 0);
+TEST_F(OgHttp2AdapterTest, GetSendWindowSize) {
+  const int peer_window = adapter_->GetSendWindowSize();
+  EXPECT_EQ(peer_window, kInitialFlowControlWindowSize);
 }
 
 TEST_F(OgHttp2AdapterTest, MarkDataConsumedForStream) {
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 5927914..853c374 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -23,7 +23,14 @@
 }
 
 OgHttp2Session::OgHttp2Session(Http2VisitorInterface& visitor, Options options)
-    : visitor_(visitor), headers_handler_(visitor), options_(options) {
+    : visitor_(visitor),
+      headers_handler_(visitor),
+      connection_window_manager_(kInitialFlowControlWindowSize,
+                                 [this](size_t window_update_delta) {
+                                   SendWindowUpdate(kConnectionStreamId,
+                                                    window_update_delta);
+                                 }),
+      options_(options) {
   decoder_.set_visitor(this);
   if (options_.perspective == Perspective::kServer) {
     remaining_preface_ = {spdy::kHttp2ConnectionHeaderPrefix,
@@ -59,6 +66,18 @@
   return true;
 }
 
+int OgHttp2Session::GetStreamReceiveWindowSize(Http2StreamId stream_id) const {
+  auto it = stream_map_.find(stream_id);
+  if (it != stream_map_.end()) {
+    return it->second.window_manager.CurrentWindowSize();
+  }
+  return -1;
+}
+
+int OgHttp2Session::GetReceiveWindowSize() const {
+  return connection_window_manager_.CurrentWindowSize();
+}
+
 ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
   ssize_t preface_consumed = 0;
   if (!remaining_preface_.empty()) {
@@ -95,6 +114,7 @@
   } else {
     it->second.window_manager.MarkDataFlushed(num_bytes);
   }
+  connection_window_manager_.MarkDataFlushed(num_bytes);
   return 0;  // Remove?
 }
 
@@ -369,6 +389,7 @@
 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
                                        const char* data,
                                        size_t len) {
+  MarkDataBuffered(stream_id, len);
   visitor_.OnDataForStream(stream_id, absl::string_view(data, len));
 }
 
@@ -380,13 +401,15 @@
   visitor_.OnEndStream(stream_id);
 }
 
-void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId /*stream_id*/,
-                                       size_t /*value*/) {
-  // TODO(181586191): handle padding
+void OgHttp2Session::OnStreamPadLength(spdy::SpdyStreamId stream_id,
+                                       size_t value) {
+  MarkDataBuffered(stream_id, 1 + value);
+  // TODO(181586191): Pass padding to the visitor?
 }
 
 void OgHttp2Session::OnStreamPadding(spdy::SpdyStreamId stream_id, size_t len) {
-  // TODO(181586191): handle padding
+  // Flow control was accounted for in OnStreamPadLength().
+  // TODO(181586191): Pass padding to the visitor?
 }
 
 spdy::SpdyHeadersHandlerInterface* OgHttp2Session::OnHeaderFrameStart(
@@ -559,5 +582,12 @@
   }
 }
 
+void OgHttp2Session::MarkDataBuffered(Http2StreamId stream_id, size_t bytes) {
+  connection_window_manager_.MarkDataBuffered(bytes);
+  if (auto it = stream_map_.find(stream_id); it != stream_map_.end()) {
+    it->second.window_manager.MarkDataBuffered(bytes);
+  }
+}
+
 }  // namespace adapter
 }  // namespace http2
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index e2ab3fd..9634400 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -57,6 +57,13 @@
   // Resumes a stream that was previously blocked. Returns true on success.
   bool ResumeStream(Http2StreamId stream_id);
 
+  // Returns the outstanding stream receive window, or -1 if the stream does not
+  // exist.
+  int GetStreamReceiveWindowSize(Http2StreamId stream_id) const;
+
+  // Returns the outstanding connection receive window.
+  int GetReceiveWindowSize() const;
+
   // From Http2Session.
   ssize_t ProcessBytes(absl::string_view bytes) override;
   int Consume(Http2StreamId stream_id, size_t num_bytes) override;
@@ -171,6 +178,9 @@
   // Section 8.1.
   void MaybeCloseWithRstStream(Http2StreamId stream_id, StreamState& state);
 
+  // Performs flow control accounting for data sent by the peer.
+  void MarkDataBuffered(Http2StreamId stream_id, size_t bytes);
+
   // Receives events when inbound frames are parsed.
   Http2VisitorInterface& visitor_;
 
@@ -199,6 +209,8 @@
   // session.
   absl::string_view remaining_preface_;
 
+  WindowManager connection_window_manager_;
+
   Http2StreamId next_stream_id_ = 1;
   Http2StreamId highest_received_stream_id_ = 0;
   int connection_send_window_ = kInitialFlowControlWindowSize;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 3869db1..cdaacdc 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -67,30 +67,46 @@
             kInitialFlowControlWindowSize + 1000);
   EXPECT_EQ(0, session.GetHighestReceivedStreamId());
 
+  // Connection has not yet received any data.
+  EXPECT_EQ(kInitialFlowControlWindowSize, session.GetReceiveWindowSize());
+
   // Should OgHttp2Session require that streams 1 and 3 have been created?
 
+  // Submit a request to ensure the first stream is created.
+  const char* kSentinel1 = "arbitrary pointer 1";
+  TestDataFrameSource body1(visitor, "This is an example request body.");
+  int stream_id =
+      session.SubmitRequest(ToHeaders({{":method", "POST"},
+                                       {":scheme", "http"},
+                                       {":authority", "example.com"},
+                                       {":path", "/this/is/request/one"}}),
+                            &body1, const_cast<char*>(kSentinel1));
+  EXPECT_EQ(stream_id, 1);
+
   const std::string stream_frames =
       TestFrameSequence()
-          .Headers(1,
+          .Headers(stream_id,
                    {{":status", "200"},
                     {"server", "my-fake-server"},
                     {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
                    /*fin=*/false)
-          .Data(1, "This is the response body.")
+          .Data(stream_id, "This is the response body.")
           .RstStream(3, Http2ErrorCode::INTERNAL_ERROR)
           .GoAway(5, Http2ErrorCode::ENHANCE_YOUR_CALM, "calm down!!")
           .Serialize();
 
-  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
-  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
-  EXPECT_CALL(visitor, OnHeaderForStream(1, ":status", "200"));
-  EXPECT_CALL(visitor, OnHeaderForStream(1, "server", "my-fake-server"));
+  EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(stream_id));
+  EXPECT_CALL(visitor, OnHeaderForStream(stream_id, ":status", "200"));
   EXPECT_CALL(visitor,
-              OnHeaderForStream(1, "date", "Tue, 6 Apr 2021 12:54:01 GMT"));
-  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
-  EXPECT_CALL(visitor, OnFrameHeader(1, 26, DATA, 0));
-  EXPECT_CALL(visitor, OnBeginDataForStream(1, 26));
-  EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
+              OnHeaderForStream(stream_id, "server", "my-fake-server"));
+  EXPECT_CALL(visitor, OnHeaderForStream(stream_id, "date",
+                                         "Tue, 6 Apr 2021 12:54:01 GMT"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(stream_id));
+  EXPECT_CALL(visitor, OnFrameHeader(stream_id, 26, DATA, 0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, 26));
+  EXPECT_CALL(visitor,
+              OnDataForStream(stream_id, "This is the response body."));
   EXPECT_CALL(visitor, OnFrameHeader(3, 4, RST_STREAM, 0));
   EXPECT_CALL(visitor, OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR));
   EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR));
@@ -99,6 +115,13 @@
   const ssize_t stream_result = session.ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), stream_result);
   EXPECT_EQ(3, session.GetHighestReceivedStreamId());
+
+  // The first stream is active and has received some data.
+  EXPECT_GT(kInitialFlowControlWindowSize,
+            session.GetStreamReceiveWindowSize(stream_id));
+  // Connection receive window is equivalent to the first stream's.
+  EXPECT_EQ(session.GetReceiveWindowSize(),
+            session.GetStreamReceiveWindowSize(stream_id));
 }
 
 // Verifies that a client session enqueues initial SETTINGS if Send() is called
@@ -404,6 +427,13 @@
 
   EXPECT_EQ(kSentinel1, session.GetStreamUserData(1));
 
+  // The first stream is active and has received some data.
+  EXPECT_GT(kInitialFlowControlWindowSize,
+            session.GetStreamReceiveWindowSize(1));
+  // Connection receive window is equivalent to the first stream's.
+  EXPECT_EQ(session.GetReceiveWindowSize(),
+            session.GetStreamReceiveWindowSize(1));
+
   // TODO(birenroy): drop stream state when streams are closed. It should no
   // longer be possible to set user data.
   const char* kSentinel3 = "another arbitrary pointer";