Enable OgHttp2Session to RST_STREAM overflow streams with unacknowledged MAX_CONCURRENT_STREAMS values.

This CL adds a pending max inbound concurrent streams field to OgHttp2Session,
which stores the pending MAX_CONCURRENT_STREAMS value sent by OgHttp2Session
but not yet acknowledged by the peer. Then OgHttp2Session can reject incoming
streams exceeding the value with a RST_STREAM REFUSED_STREAM (not GOAWAY),
which better aligns oghttp2 with nghttp2 behavior.

This change affects at least two codec_impl_test tests:
http://sponge2/d291d8b8-2281-48b2-ae8f-203c94b0f484

If test mutual dispatch resolution is merged
(https://github.com/envoyproxy/envoy/pull/19195),
Http2CodecImplStreamLimitTest.LazyDecreaseMaxConcurrentStreamsConsumeError
passes, and
Http2CodecImplStreamLimitTest.LazyDecreaseMaxConcurrentStreamsIgnoreError
passes mod some nghttp2-specific error message [*].

[*] http://google3/third_party/envoy/src/source/common/http/http2/codec_impl.cc;l=850;rcl=410881808

PiperOrigin-RevId: 414519202
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index fae5d11..23322cb 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -3243,7 +3243,7 @@
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(stream_frames.size(), stream_result);
 
-  // Apparently nghttp2 sends a GOAWAY for this error, even though
+  // The server should send a GOAWAY for this error, even though
   // OnInvalidFrame() returns true.
   EXPECT_TRUE(adapter->want_write());
   EXPECT_CALL(visitor, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
@@ -3289,8 +3289,8 @@
   visitor.Clear();
 
   // Let the client avoid sending a SETTINGS ack and attempt to open more than
-  // the advertised number of streams. Apparently nghttp2 still rejects the
-  // overflow stream, albeit with a RST_STREAM by default instead of a GOAWAY.
+  // the advertised number of streams. The server should still reject the
+  // overflow stream, albeit with RST_STREAM REFUSED_STREAM instead of GOAWAY.
   const std::string stream_frames =
       TestFrameSequence()
           .Headers(1,
@@ -3318,7 +3318,7 @@
                   3, Http2VisitorInterface::InvalidFrameError::kRefusedStream));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
-  EXPECT_EQ(stream_frames.size(), stream_result);
+  EXPECT_EQ(stream_result, stream_frames.size());
 
   // The server sends a RST_STREAM for the offending stream.
   EXPECT_TRUE(adapter->want_write());
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index b8e4c61..ea6eaf3 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -3083,23 +3083,30 @@
   EXPECT_CALL(
       visitor,
       OnInvalidFrame(3, Http2VisitorInterface::InvalidFrameError::kProtocol));
+  // The oghttp2 stack also signals the connection error via OnConnectionError()
+  // and a negative ProcessBytes() return value.
+  EXPECT_CALL(visitor,
+              OnConnectionError(Http2VisitorInterface::ConnectionError::
+                                    kExceededMaxConcurrentStreams));
 
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
-  EXPECT_EQ(static_cast<size_t>(stream_result), stream_frames.size());
+  EXPECT_LT(stream_result, 0);
 
-  // The server should send a RST_STREAM for the offending stream.
+  // The server should send a GOAWAY for this error, even though
+  // OnInvalidFrame() returns true.
   EXPECT_TRUE(adapter->want_write());
-  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, _, 0x0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(GOAWAY, 0, _, 0x0));
   EXPECT_CALL(visitor,
-              OnFrameSent(RST_STREAM, 3, _, 0x0,
+              OnFrameSent(GOAWAY, 0, _, 0x0,
                           static_cast<int>(Http2ErrorCode::PROTOCOL_ERROR)));
 
   send_result = adapter->Send();
   EXPECT_EQ(0, send_result);
-  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::RST_STREAM}));
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::GOAWAY}));
 }
 
-TEST(OgHttp2AdapterServerTest, ServerAllowsNewStreamAboveStreamLimitBeforeAck) {
+TEST(OgHttp2AdapterServerTest,
+     ServerRstStreamsNewStreamAboveStreamLimitBeforeAck) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
   auto adapter = OgHttp2Adapter::Create(visitor, options);
@@ -3133,8 +3140,8 @@
   visitor.Clear();
 
   // Let the client avoid sending a SETTINGS ack and attempt to open more than
-  // the advertised number of streams. The overflow stream should be allowed,
-  // due to the lack of SETTINGS ack.
+  // the advertised number of streams. The server should still reject the
+  // overflow stream, albeit with RST_STREAM REFUSED_STREAM instead of GOAWAY.
   const std::string stream_frames =
       TestFrameSequence()
           .Headers(1,
@@ -3157,15 +3164,23 @@
   EXPECT_CALL(visitor, OnEndHeadersForStream(1));
   EXPECT_CALL(visitor, OnEndStream(1));
   EXPECT_CALL(visitor, OnFrameHeader(3, _, HEADERS, 0x5));
-  EXPECT_CALL(visitor, OnBeginHeadersForStream(3));
-  EXPECT_CALL(visitor, OnHeaderForStream(3, _, _)).Times(4);
-  EXPECT_CALL(visitor, OnEndHeadersForStream(3));
-  EXPECT_CALL(visitor, OnEndStream(3));
+  EXPECT_CALL(visitor,
+              OnInvalidFrame(
+                  3, Http2VisitorInterface::InvalidFrameError::kRefusedStream));
 
-  // The server should process the bytes without complaint.
   const int64_t stream_result = adapter->ProcessBytes(stream_frames);
   EXPECT_EQ(static_cast<size_t>(stream_result), stream_frames.size());
-  EXPECT_FALSE(adapter->want_write());
+
+  // The server sends a RST_STREAM for the offending stream.
+  EXPECT_TRUE(adapter->want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 3, _, 0x0));
+  EXPECT_CALL(visitor,
+              OnFrameSent(RST_STREAM, 3, _, 0x0,
+                          static_cast<int>(Http2ErrorCode::REFUSED_STREAM)));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::RST_STREAM}));
 }
 
 }  // namespace
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 09ea8a5..645a38d 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -1030,16 +1030,24 @@
     }
 
     if (stream_map_.size() >= max_inbound_concurrent_streams_) {
-      // The new stream would exceed our advertised MAX_CONCURRENT_STREAMS.
-      // Currently, use PROTOCOL_ERROR for behavior parity in Envoy.
-      // TODO(diannahu): Change to GOAWAY, and add RST_STREAM for exceeding the
-      // pending max inbound concurrent streams value.
-      EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
-          stream_id, spdy::ERROR_CODE_PROTOCOL_ERROR));
-      const bool ok = visitor_.OnInvalidFrame(
+      // The new stream would exceed our advertised and acknowledged
+      // MAX_CONCURRENT_STREAMS. For parity with nghttp2, treat this error as a
+      // connection-level PROTOCOL_ERROR.
+      visitor_.OnInvalidFrame(
           stream_id, Http2VisitorInterface::InvalidFrameError::kProtocol);
+      LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
+                          ConnectionError::kExceededMaxConcurrentStreams);
+      return;
+    }
+    if (stream_map_.size() >= pending_max_inbound_concurrent_streams_) {
+      // The new stream would exceed our advertised but unacked
+      // MAX_CONCURRENT_STREAMS. Refuse the stream for parity with nghttp2.
+      EnqueueFrame(absl::make_unique<spdy::SpdyRstStreamIR>(
+          stream_id, spdy::ERROR_CODE_REFUSED_STREAM));
+      const bool ok = visitor_.OnInvalidFrame(
+          stream_id, Http2VisitorInterface::InvalidFrameError::kRefusedStream);
       if (!ok) {
-        LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
+        LatchErrorAndNotify(Http2ErrorCode::REFUSED_STREAM,
                             ConnectionError::kExceededMaxConcurrentStreams);
       }
       return;
@@ -1206,6 +1214,10 @@
   auto settings_ir = absl::make_unique<SpdySettingsIR>();
   for (const Http2Setting& setting : settings) {
     settings_ir->AddSetting(setting.id, setting.value);
+
+    if (setting.id == Http2KnownSettingsId::MAX_CONCURRENT_STREAMS) {
+      pending_max_inbound_concurrent_streams_ = setting.value;
+    }
   }
 
   // Copy the (small) map of settings we are about to send so that we can set
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 9f00613..670b236 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -410,8 +410,9 @@
   // is unlimited, the spec encourages a value of at least 100. We limit
   // ourselves to opening 100 until told otherwise by the peer and allow an
   // unlimited number from the peer until updated from SETTINGS we send.
-  // TODO(diannahu): Add a pending/unacked max inbound concurrent streams value.
   uint32_t max_outbound_concurrent_streams_ = 100u;
+  uint32_t pending_max_inbound_concurrent_streams_ =
+      std::numeric_limits<uint32_t>::max();
   uint32_t max_inbound_concurrent_streams_ =
       std::numeric_limits<uint32_t>::max();
   Options options_;