Moves //gfe/gfe2/http2:window_manager to //third_party/http2/adapter.

PiperOrigin-RevId: 364328994
Change-Id: I0899a629122cfe88f2f952ad5dae00e1331c57bc
diff --git a/http2/adapter/window_manager.cc b/http2/adapter/window_manager.cc
new file mode 100644
index 0000000..148931f
--- /dev/null
+++ b/http2/adapter/window_manager.cc
@@ -0,0 +1,100 @@
+#include "http2/adapter/window_manager.h"
+
+#include <utility>
+
+#include "common/platform/api/quiche_logging.h"
+#include "spdy/platform/api/spdy_bug_tracker.h"
+
+namespace http2 {
+namespace adapter {
+
+WindowManager::WindowManager(size_t window_size_limit,
+                             WindowUpdateListener listener)
+    : limit_(window_size_limit), window_(window_size_limit), buffered_(0),
+      listener_(std::move(listener)) {}
+
+void WindowManager::OnWindowSizeLimitChange(const size_t new_limit) {
+  QUICHE_VLOG(2) << "WindowManager@" << this
+                 << " OnWindowSizeLimitChange from old limit of " << limit_
+                 << " to new limit of " << new_limit;
+  if (new_limit > limit_) {
+    window_ += (new_limit - limit_);
+  } else {
+    SPDY_BUG_V2(H2 window decrease)
+        << "Window size limit decrease not currently supported.";
+  }
+  limit_ = new_limit;
+}
+
+void WindowManager::SetWindowSizeLimit(size_t new_limit) {
+  QUICHE_VLOG(2) << "WindowManager@" << this
+                 << " SetWindowSizeLimit from old limit of " << limit_
+                 << " to new limit of " << new_limit;
+  limit_ = new_limit;
+  MaybeNotifyListener();
+}
+
+bool WindowManager::MarkDataBuffered(size_t bytes) {
+  QUICHE_VLOG(2) << "WindowManager@" << this << " window: " << window_
+                 << " bytes: " << bytes;
+  if (window_ < bytes) {
+    QUICHE_VLOG(2) << "WindowManager@" << this << " window underflow "
+                   << "window: " << window_ << " bytes: " << bytes;
+    window_ = 0;
+  } else {
+    window_ -= bytes;
+  }
+  buffered_ += bytes;
+  if (window_ == 0) {
+    // If data hasn't been flushed in a while there may be space available.
+    MaybeNotifyListener();
+  }
+  return window_ > 0;
+}
+
+void WindowManager::MarkDataFlushed(size_t bytes) {
+  QUICHE_VLOG(2) << "WindowManager@" << this << " buffered: " << buffered_
+                 << " bytes: " << bytes;
+  if (buffered_ < bytes) {
+    SPDY_BUG_V2(bug_2816_1)
+        << "WindowManager@" << this << " buffered underflow "
+        << "buffered_: " << buffered_ << " bytes: " << bytes;
+    buffered_ = 0;
+  } else {
+    buffered_ -= bytes;
+  }
+  MaybeNotifyListener();
+}
+
+void WindowManager::MaybeNotifyListener() {
+  if (buffered_ + window_ > limit_) {
+    QUICHE_LOG(ERROR) << "Flow control violation; limit: " << limit_
+                      << " buffered: " << buffered_ << " window: " << window_;
+    return;
+  }
+  // For the sake of efficiency, we want to send window updates if less than
+  // half of the max quota is available to the peer at any point in time.
+  // http://google3/gfe/gfe2/stubby/autobahn_fd_wrapper.cc?l=1180-1183&rcl=307416556
+  const size_t kDesiredMinWindow = limit_ / 2;
+  const size_t kDesiredMinDelta = limit_ / 3;
+  const size_t delta = limit_ - (buffered_ + window_);
+  bool send_update = false;
+  if (delta >= kDesiredMinDelta) {
+    // This particular window update was sent because the available delta
+    // exceeded the desired minimum.
+    send_update = true;
+  } else if (window_ < kDesiredMinWindow) {
+    // This particular window update was sent because the quota available to the
+    // peer at this moment is less than the desired minimum.
+    send_update = true;
+  }
+  if (send_update && delta > 0) {
+    QUICHE_VLOG(2) << "WindowManager@" << this
+                   << " Informing listener of delta: " << delta;
+    listener_(delta);
+    window_ += delta;
+  }
+}
+
+}  // namespace adapter
+}  // namespace http2
diff --git a/http2/adapter/window_manager.h b/http2/adapter/window_manager.h
new file mode 100644
index 0000000..3a25205
--- /dev/null
+++ b/http2/adapter/window_manager.h
@@ -0,0 +1,70 @@
+#ifndef QUICHE_HTTP2_ADAPTER_WINDOW_MANAGER_H_
+#define QUICHE_HTTP2_ADAPTER_WINDOW_MANAGER_H_
+
+#include <functional>
+
+namespace http2 {
+namespace adapter {
+
+// This class keeps track of a HTTP/2 flow control window, notifying a listener
+// when a window update needs to be sent. This class is not thread-safe.
+class WindowManager {
+ public:
+  // A WindowUpdateListener is invoked when it is time to send a window update.
+  typedef std::function<void(size_t)> WindowUpdateListener;
+
+  WindowManager(size_t window_size_limit,
+                WindowUpdateListener listener);
+
+  size_t CurrentWindowSize() const { return window_; }
+  size_t WindowSizeLimit() const { return limit_; }
+
+  // Called when the window size limit is changed (typically via settings) but
+  // no window update should be sent.
+  void OnWindowSizeLimitChange(size_t new_limit);
+
+  // Sets the window size limit to |new_limit| and notifies the listener to
+  // update as necessary.
+  void SetWindowSizeLimit(size_t new_limit);
+
+  // Increments the running total of data bytes buffered. Returns true iff there
+  // is more window remaining.
+  bool MarkDataBuffered(size_t bytes);
+
+  // Increments the running total of data bytes that have been flushed or
+  // dropped. Invokes the listener if the current window is smaller than some
+  // threshold and there is quota available to send.
+  void MarkDataFlushed(size_t bytes);
+
+  // Convenience method, used when incoming data is immediately dropped or
+  // ignored.
+  void MarkWindowConsumed(size_t bytes) {
+    MarkDataBuffered(bytes);
+    MarkDataFlushed(bytes);
+  }
+
+ private:
+  friend class WindowManagerPeer;
+
+  void MaybeNotifyListener();
+
+  // The upper bound on the flow control window. The GFE attempts to maintain a
+  // window of this size at the peer as data is proxied through.
+  size_t limit_;
+
+  // The current flow control window that has not been advertised to the peer
+  // and not yet consumed. The peer can send this many bytes before becoming
+  // blocked.
+  size_t window_;
+
+  // The amount of data already buffered, which should count against the flow
+  // control window upper bound.
+  size_t buffered_;
+
+  WindowUpdateListener listener_;
+};
+
+}  // namespace adapter
+}  // namespace http2
+
+#endif  // QUICHE_HTTP2_ADAPTER_WINDOW_MANAGER_H_
diff --git a/http2/adapter/window_manager_test.cc b/http2/adapter/window_manager_test.cc
new file mode 100644
index 0000000..4c589a7
--- /dev/null
+++ b/http2/adapter/window_manager_test.cc
@@ -0,0 +1,172 @@
+#include "http2/adapter/window_manager.h"
+
+#include <list>
+
+#include "testing/base/public/gmock.h"
+#include "testing/base/public/gunit.h"
+#include "absl/functional/bind_front.h"
+#include "spdy/platform/api/spdy_test_helpers.h"
+#include "util/random/acmrandom.h"
+
+using ::absl::bind_front;
+
+namespace http2 {
+namespace adapter {
+
+// Use the peer to access private vars of WindowManager.
+class WindowManagerPeer {
+ public:
+  explicit WindowManagerPeer(const WindowManager& wm) : wm_(wm) {}
+
+  size_t buffered() {
+    return wm_.buffered_;
+  }
+
+ private:
+  const WindowManager& wm_;
+};
+
+class WindowManagerTest : public ::testing::Test {
+ protected:
+  WindowManagerTest()
+      : wm_(kDefaultLimit, bind_front(&WindowManagerTest::OnCall, this)),
+        peer_(wm_),
+        random_(ACMRandom::HostnamePidTimeSeed()) {}
+
+  void OnCall(size_t s) {
+    call_sequence_.push_back(s);
+  }
+
+  const size_t kDefaultLimit = 32 * 1024 * 3;
+  std::list<size_t> call_sequence_;
+  WindowManager wm_;
+  WindowManagerPeer peer_;
+  ACMRandom random_;
+};
+
+// A few no-op calls.
+TEST_F(WindowManagerTest, NoOps) {
+  wm_.SetWindowSizeLimit(kDefaultLimit);
+  wm_.SetWindowSizeLimit(0);
+  wm_.SetWindowSizeLimit(kDefaultLimit);
+  wm_.MarkDataBuffered(0);
+  wm_.MarkDataFlushed(0);
+  EXPECT_TRUE(call_sequence_.empty());
+}
+
+// This test verifies that WindowManager does not notify its listener when data
+// is only buffered, and never flushed.
+TEST_F(WindowManagerTest, DataOnlyBuffered) {
+  size_t total = 0;
+  while (total < kDefaultLimit) {
+    size_t s = std::min<size_t>(kDefaultLimit - total,
+                                random_.UnbiasedUniform64(1024));
+    total += s;
+    wm_.MarkDataBuffered(s);
+  }
+  EXPECT_THAT(call_sequence_, ::testing::IsEmpty());
+}
+
+// This test verifies that WindowManager does notify its listener when data is
+// buffered and subsequently flushed.
+TEST_F(WindowManagerTest, DataBufferedAndFlushed) {
+  size_t total_buffered = 0;
+  size_t total_flushed = 0;
+  while (call_sequence_.empty()) {
+    size_t buffered = std::min<size_t>(kDefaultLimit - total_buffered,
+                                       random_.UnbiasedUniform64(1024));
+    wm_.MarkDataBuffered(buffered);
+    total_buffered += buffered;
+    EXPECT_TRUE(call_sequence_.empty());
+    size_t flushed =
+        random_.UnbiasedUniform64(total_buffered - total_flushed);
+    wm_.MarkDataFlushed(flushed);
+    total_flushed += flushed;
+  }
+  // If WindowManager decided to send an update, at least one third of the
+  // window must have been consumed by buffered data.
+  EXPECT_GE(total_buffered, kDefaultLimit / 3);
+}
+
+// Window manager should avoid window underflow.
+TEST_F(WindowManagerTest, AvoidWindowUnderflow) {
+  EXPECT_EQ(wm_.CurrentWindowSize(), wm_.WindowSizeLimit());
+  // Don't buffer more than the total window!
+  wm_.MarkDataBuffered(wm_.WindowSizeLimit() + 1);
+  EXPECT_EQ(wm_.CurrentWindowSize(), 0);
+}
+
+// Window manager should GFE_BUG and avoid buffered underflow.
+TEST_F(WindowManagerTest, AvoidBufferedUnderflow) {
+  EXPECT_EQ(peer_.buffered(), 0);
+  // Don't flush more than has been buffered!
+  EXPECT_SPDY_BUG(wm_.MarkDataFlushed(1), "buffered underflow");
+  EXPECT_EQ(peer_.buffered(), 0);
+
+  wm_.MarkDataBuffered(42);
+  EXPECT_EQ(peer_.buffered(), 42);
+  // Don't flush more than has been buffered!
+  EXPECT_SPDY_BUG(wm_.MarkDataFlushed(43), "buffered underflow");
+  EXPECT_EQ(peer_.buffered(), 0);
+}
+
+// This test verifies that WindowManager notifies its listener when window is
+// consumed (data is ignored or immediately dropped).
+TEST_F(WindowManagerTest, WindowConsumed) {
+  size_t consumed = kDefaultLimit / 3 - 1;
+  wm_.MarkWindowConsumed(consumed);
+  EXPECT_TRUE(call_sequence_.empty());
+  const size_t extra = 1;
+  wm_.MarkWindowConsumed(extra);
+  EXPECT_THAT(call_sequence_, testing::ElementsAre(consumed + extra));
+}
+
+// This test verifies that WindowManager notifies its listener when the window
+// size limit is increased.
+TEST_F(WindowManagerTest, ListenerCalledOnSizeUpdate) {
+  wm_.SetWindowSizeLimit(kDefaultLimit - 1024);
+  EXPECT_TRUE(call_sequence_.empty());
+  wm_.SetWindowSizeLimit(kDefaultLimit * 5);
+  // Because max(outstanding window, previous limit) is kDefaultLimit, it is
+  // only appropriate to increase the window by kDefaultLimit * 4.
+  EXPECT_THAT(call_sequence_, testing::ElementsAre(kDefaultLimit * 4));
+}
+
+// This test verifies that when data is buffered and then the limit is
+// decreased, WindowManager only notifies the listener once any outstanding
+// window has been consumed.
+TEST_F(WindowManagerTest, WindowUpdateAfterLimitDecreased) {
+  wm_.MarkDataBuffered(kDefaultLimit - 1024);
+  wm_.SetWindowSizeLimit(kDefaultLimit - 2048);
+
+  // Now there are 2048 bytes of window outstanding beyond the current limit,
+  // and we have 1024 bytes of data buffered beyond the current limit. This is
+  // intentional, to be sure that WindowManager works properly if the limit is
+  // decreased at runtime.
+
+  wm_.MarkDataFlushed(512);
+  EXPECT_TRUE(call_sequence_.empty());
+  wm_.MarkDataFlushed(512);
+  EXPECT_TRUE(call_sequence_.empty());
+  wm_.MarkDataFlushed(512);
+  EXPECT_TRUE(call_sequence_.empty());
+  wm_.MarkDataFlushed(1024);
+  EXPECT_THAT(call_sequence_, testing::ElementsAre(512));
+}
+
+// For normal behavior, we only call MaybeNotifyListener() when data is
+// flushed. But if window runs out entirely, we still need to call
+// MaybeNotifyListener() to avoid becoming artificially blocked when data isn't
+// being flushed.
+TEST_F(WindowManagerTest, ZeroWindowNotification) {
+  // Consume a byte of window, but not enough to trigger an update.
+  wm_.MarkWindowConsumed(1);
+
+  // Buffer the remaining window.
+  wm_.MarkDataBuffered(kDefaultLimit - 1);
+  // Listener is notified of the remaining byte of possible window.
+  EXPECT_THAT(call_sequence_, testing::ElementsAre(1));
+}
+
+}  // namespace adapter
+}  // namespace http2