Add OgHttp2Session support for SETTINGS ack callbacks and apply to MAX_CONCURRENT_STREAMS.

This CL adds support to OgHttp2Session for applying sent SETTINGS parameters on
receipt of a SETTINGS ack from the peer. This support is in line with behavior
in both nghttp2 [1] and Http2Dispatcher [2].

This CL also uses this newly added support to enforce locally sent
MAX_CONCURRENT_STREAMS on ack. Note that Envoy behavior currently assumes that
peer violations of MAX_CONCURRENT_STREAMS are connection-level [3, 4], so
OgHttp2Session treats violations as a PROTOCOL_ERROR for parity with Envoy.

With this change, Http2FloodMitigationTest.TooManyStreams now passes with
oghttp2, completing flood frame mitigation integration test parity.

Before: http://sponge2/6275a2b6-178b-4ea7-b819-dbd0103564da
After: http://sponge2/463d751a-22df-446f-872c-2a6015c52969

[1] http://google3/third_party/nghttp2/src/lib/nghttp2_session.c;l=7065-7088;rcl=409864218
[2] http://google3/gfe/gfe2/http2/http2_dispatcher.h;l=1185;rcl=410682472
[3] http://google3/third_party/nghttp2/src/lib/nghttp2_session.c;l=3880-3884;rcl=410682472
[4] http://google3/third_party/envoy/src/source/common/http/http2/codec_impl.cc;l=1180,1192;rcl=410881808

PiperOrigin-RevId: 413288939
diff --git a/http2/adapter/http2_util.cc b/http2/adapter/http2_util.cc
index 39b71dc..553c0bd 100644
--- a/http2/adapter/http2_util.cc
+++ b/http2/adapter/http2_util.cc
@@ -95,6 +95,8 @@
       return "kWrongFrameSequence";
     case ConnectionError::kInvalidPushPromise:
       return "InvalidPushPromise";
+    case ConnectionError::kExceededMaxConcurrentStreams:
+      return "ExceededMaxConcurrentStreams";
   }
   return "UnknownConnectionError";
 }
diff --git a/http2/adapter/http2_visitor_interface.h b/http2/adapter/http2_visitor_interface.h
index 08b9ba5..6d843a3 100644
--- a/http2/adapter/http2_visitor_interface.h
+++ b/http2/adapter/http2_visitor_interface.h
@@ -76,6 +76,8 @@
     kWrongFrameSequence,
     // The peer sent an invalid PUSH_PROMISE frame.
     kInvalidPushPromise,
+    // The peer exceeded the max concurrent streams limit.
+    kExceededMaxConcurrentStreams,
   };
   virtual void OnConnectionError(ConnectionError error) = 0;
 
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index d406994..3f25f2b 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -3062,6 +3062,161 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::GOAWAY}));
 }
 
+TEST(NgHttp2AdapterTest, ServerForbidsNewStreamAboveStreamLimit) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+  adapter->SubmitSettings({{MAX_CONCURRENT_STREAMS, 1}});
+
+  const std::string initial_frames =
+      TestFrameSequence().ClientPreface().Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(initial_frames.size(), initial_result);
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Server initial SETTINGS (with MAX_CONCURRENT_STREAMS) and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 6, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 6, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  // Let the client send a SETTINGS ack and then attempt to open more than the
+  // advertised number of streams. The overflow stream should be rejected.
+  const std::string stream_frames =
+      TestFrameSequence()
+          .SettingsAck()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "https"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Headers(3,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/two"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0x1));
+  EXPECT_CALL(visitor, OnSettingsAck());
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 0x5));
+  EXPECT_CALL(
+      visitor,
+      OnInvalidFrame(3, Http2VisitorInterface::InvalidFrameError::kProtocol));
+
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), stream_result);
+
+  // Apparently nghttp2 sends a GOAWAY for this error, even though
+  // OnInvalidFrame() returns true.
+  EXPECT_TRUE(adapter->want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(GOAWAY, 0, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::PROTOCOL_ERROR)));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::GOAWAY}));
+}
+
+TEST(NgHttp2AdapterTest, ServerRstStreamsNewStreamAboveStreamLimitBeforeAck) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+  adapter->SubmitSettings({{MAX_CONCURRENT_STREAMS, 1}});
+
+  const std::string initial_frames =
+      TestFrameSequence().ClientPreface().Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(initial_frames.size(), initial_result);
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Server initial SETTINGS (with MAX_CONCURRENT_STREAMS) and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 6, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 6, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  // Let the client avoid sending a SETTINGS ack and attempt to open more than
+  // the advertised number of streams. Apparently nghttp2 still rejects the
+  // overflow stream, albeit with a RST_STREAM by default instead of a GOAWAY.
+  const std::string stream_frames =
+      TestFrameSequence()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "https"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Headers(3,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/two"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor,
+              OnInvalidFrame(
+                  3, Http2VisitorInterface::InvalidFrameError::kRefusedStream));
+
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), stream_result);
+
+  // The server sends a RST_STREAM for the offending stream.
+  EXPECT_TRUE(adapter->want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 3, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::REFUSED_STREAM)));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::RST_STREAM}));
+}
+
 TEST(NgHttp2AdapterTest, AutomaticSettingsAndPingAcks) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index c3f4356..86d5f72 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -2725,6 +2725,154 @@
                                             spdy::SpdyFrameType::GOAWAY}));
 }
 
+TEST(OgHttp2AdapterServerTest, ServerForbidsNewStreamAboveStreamLimit) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+  adapter->SubmitSettings({{MAX_CONCURRENT_STREAMS, 1}});
+
+  const std::string initial_frames =
+      TestFrameSequence().ClientPreface().Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(static_cast<size_t>(initial_result), initial_frames.size());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Server initial SETTINGS (with MAX_CONCURRENT_STREAMS) and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 6, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 6, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  // Let the client send a SETTINGS ack and then attempt to open more than the
+  // advertised number of streams. The overflow stream should be rejected.
+  const std::string stream_frames =
+      TestFrameSequence()
+          .SettingsAck()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "https"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Headers(3,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/two"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0x1));
+  EXPECT_CALL(visitor, OnSettingsAck());
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 0x5));
+  EXPECT_CALL(
+      visitor,
+      OnInvalidFrame(3, Http2VisitorInterface::InvalidFrameError::kProtocol));
+
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(static_cast<size_t>(stream_result), stream_frames.size());
+
+  // The server should send a RST_STREAM for the offending stream.
+  EXPECT_TRUE(adapter->want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 3, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::PROTOCOL_ERROR)));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::RST_STREAM}));
+}
+
+TEST(OgHttp2AdapterServerTest, ServerAllowsNewStreamAboveStreamLimitBeforeAck) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+  adapter->SubmitSettings({{MAX_CONCURRENT_STREAMS, 1}});
+
+  const std::string initial_frames =
+      TestFrameSequence().ClientPreface().Serialize();
+
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+  EXPECT_EQ(static_cast<size_t>(initial_result), initial_frames.size());
+
+  EXPECT_TRUE(adapter->want_write());
+
+  // Server initial SETTINGS (with MAX_CONCURRENT_STREAMS) and SETTINGS ack.
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 6, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 6, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  // Let the client avoid sending a SETTINGS ack and attempt to open more than
+  // the advertised number of streams. The overflow stream should be allowed,
+  // due to the lack of SETTINGS ack.
+  const std::string stream_frames =
+      TestFrameSequence()
+          .Headers(1,
+                   {{":method", "GET"},
+                    {":scheme", "https"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/one"}},
+                   /*fin=*/true)
+          .Headers(3,
+                   {{":method", "GET"},
+                    {":scheme", "http"},
+                    {":authority", "example.com"},
+                    {":path", "/this/is/request/two"}},
+                   /*fin=*/true)
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnEndStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 0x5));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
+  EXPECT_CALL(visitor, OnHeaderForStream(3, _, _)).Times(4);
+  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
+  EXPECT_CALL(visitor, OnEndStream(3));
+
+  // The server should process the bytes without complaint.
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(static_cast<size_t>(stream_result), stream_frames.size());
+  EXPECT_FALSE(adapter->want_write());
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 16d302a..fb37bdd 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -927,6 +927,12 @@
 }
 
 void OgHttp2Session::OnSettingsAck() {
+  if (!settings_ack_callbacks_.empty()) {
+    SettingsAckCallback callback = std::move(settings_ack_callbacks_.front());
+    settings_ack_callbacks_.pop_front();
+    callback();
+  }
+
   visitor_.OnSettingsAck();
 }
 
@@ -976,6 +982,23 @@
                           ConnectionError::kInvalidNewStreamId);
       return;
     }
+
+    if (stream_map_.size() >= max_inbound_concurrent_streams_) {
+      // The new stream would exceed our advertised MAX_CONCURRENT_STREAMS.
+      // Currently, use PROTOCOL_ERROR for behavior parity in Envoy.
+      // TODO(diannahu): Change to GOAWAY, and add RST_STREAM for exceeding the
+      // pending max inbound concurrent streams value.
+      EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
+          stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
+      const bool ok = visitor_.OnInvalidFrame(
+          stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
+      if (!ok) {
+        LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
+                            ConnectionError::kExceededMaxConcurrentStreams);
+      }
+      return;
+    }
+
     CreateStream(stream_id);
   }
 }
@@ -1138,6 +1161,17 @@
   for (const Http2Setting& setting : settings) {
     settings_ir->AddSetting(setting.id, setting.value);
   }
+
+  // Copy the (small) map of settings we are about to send so that we can set
+  // values in the SETTINGS ack callback.
+  settings_ack_callbacks_.push_back(
+      [this, settings_map = settings_ir->values()]() {
+        for (const auto id_and_value : settings_map) {
+          if (id_and_value.first == spdy::SETTINGS_MAX_CONCURRENT_STREAMS) {
+            max_inbound_concurrent_streams_ = id_and_value.second;
+          }
+        }
+      });
   return settings_ir;
 }
 
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 3126e00..58f34ec 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -2,6 +2,7 @@
 #define QUICHE_HTTP2_ADAPTER_OGHTTP2_SESSION_H_
 
 #include <cstdint>
+#include <limits>
 #include <list>
 #include <memory>
 #include <vector>
@@ -259,7 +260,6 @@
   std::vector<Http2Setting> GetInitialSettings() const;
 
   // Prepares and returns a SETTINGS frame with the given `settings`.
-  // TODO(diannahu): Add the SETTINGS ack callback here.
   std::unique_ptr<spdy::SpdySettingsIR> PrepareSettingsFrame(
       absl::Span<const Http2Setting> settings);
 
@@ -362,6 +362,11 @@
   using WriteScheduler = PriorityWriteScheduler<Http2StreamId>;
   WriteScheduler write_scheduler_;
 
+  // Stores the queue of callbacks to invoke upon receiving SETTINGS acks. At
+  // most one callback is invoked for each SETTINGS ack.
+  using SettingsAckCallback = std::function<void()>;
+  std::list<SettingsAckCallback> settings_ack_callbacks_;
+
   // Delivers header name-value pairs to the visitor.
   PassthroughHeadersHandler headers_handler_;
 
@@ -391,8 +396,15 @@
   // The initial flow control receive window size for any newly created streams.
   int32_t stream_receive_window_limit_ = kInitialFlowControlWindowSize;
   uint32_t max_frame_payload_ = 16384u;
-  // The spec encourages a value of at least 100 concurrent streams.
+  // The maximum number of concurrent streams that this connection can open to
+  // its peer and allow from its peer, respectively. Although the initial value
+  // is unlimited, the spec encourages a value of at least 100. We limit
+  // ourselves to opening 100 until told otherwise by the peer and allow an
+  // unlimited number from the peer until updated from SETTINGS we send.
+  // TODO(diannahu): Add a pending/unacked max inbound concurrent streams value.
   uint32_t max_outbound_concurrent_streams_ = 100u;
+  uint32_t max_inbound_concurrent_streams_ =
+      std::numeric_limits<uint32_t>::max();
   Options options_;
   bool received_goaway_ = false;
   bool queued_preface_ = false;