Let client-side OgHttp2Session obey the peer's advertised SETTINGS_MAX_CONCURRENT_STREAMS.

This CL allows client-side OgHttp2Session to save the peer's advertised
SETTINGS_MAX_CONCURRENT_STREAMS value and use that value in SubmitRequest() to
determine whether to create a stream immediately for the request or store the
request for later. OgHttp2Session will create a stream immediately unless that
would violate SETTINGS_MAX_CONCURRENT_STREAMS; in this case instead, the
request is stored in a (newly added) PendingStreamState. Then when removing a
stream from the map in CloseStream(), OgHttp2Session will check if pending
streams can be created (compliant with SETTINGS_MAX_CONCURRENT_STREAMS).

PiperOrigin-RevId: 398554048
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 28e3411..9877513 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -775,10 +775,40 @@
                                         {":path", "/this/is/request/two"}}),
                              nullptr, nullptr);
 
-  // A new stream is created and the session wants to write, despite
-  // MAX_CONCURRENT_STREAMS.
-  // TODO(diannahu): Respect the peer's MAX_CONCURRENT_STREAMS value.
+  // A new pending stream is created, but because of MAX_CONCURRENT_STREAMS, the
+  // session should not want to write it at the moment.
   EXPECT_GT(next_stream_id, stream_id);
+  EXPECT_FALSE(adapter->session().want_write());
+
+  const std::string stream_frames =
+      TestFrameSequence()
+          .Headers(stream_id,
+                   {{":status", "200"},
+                    {"server", "my-fake-server"},
+                    {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
+                   /*fin=*/false)
+          .Data(stream_id, "This is the response body.", /*fin=*/true)
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(stream_id));
+  EXPECT_CALL(visitor, OnHeaderForStream(stream_id, ":status", "200"));
+  EXPECT_CALL(visitor,
+              OnHeaderForStream(stream_id, "server", "my-fake-server"));
+  EXPECT_CALL(visitor, OnHeaderForStream(stream_id, "date",
+                                         "Tue, 6 Apr 2021 12:54:01 GMT"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(stream_id));
+  EXPECT_CALL(visitor, OnFrameHeader(stream_id, 26, DATA, 0x1));
+  EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, 26));
+  EXPECT_CALL(visitor,
+              OnDataForStream(stream_id, "This is the response body."));
+  EXPECT_CALL(visitor, OnEndStream(stream_id));
+  EXPECT_CALL(visitor, OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR));
+
+  // The first stream should close, which should make the session want to write
+  // the next stream.
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
   EXPECT_TRUE(adapter->session().want_write());
 
   EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, next_stream_id, _, 0x5));
@@ -786,6 +816,7 @@
 
   result = adapter->Send();
   EXPECT_EQ(0, result);
+
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
   visitor.Clear();
   EXPECT_FALSE(adapter->session().want_write());
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 0ec920e..e09ac21 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -485,8 +485,15 @@
   // TODO(birenroy): return an error for the incorrect perspective
   const Http2StreamId stream_id = next_stream_id_;
   next_stream_id_ += 2;
-  StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
-               user_data);
+  if (CanCreateStream()) {
+    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
+                 user_data);
+  } else {
+    // TODO(diannahu): There should probably be a limit to the number of allowed
+    // pending streams.
+    pending_streams_.push_back(
+        {stream_id, ToHeaderBlock(headers), std::move(data_source), user_data});
+  }
   return stream_id;
 }
 
@@ -654,6 +661,8 @@
     peer_supports_metadata_ = (value != 0);
   } else if (id == MAX_FRAME_SIZE) {
     max_frame_payload_ = value;
+  } else if (id == MAX_CONCURRENT_STREAMS) {
+    max_outbound_concurrent_streams_ = value;
   }
 }
 
@@ -896,6 +905,18 @@
                                  Http2ErrorCode error_code) {
   visitor_.OnCloseStream(stream_id, error_code);
   stream_map_.erase(stream_id);
+
+  if (!pending_streams_.empty() && CanCreateStream()) {
+    PendingStreamState& pending_stream = pending_streams_.front();
+    StartRequest(pending_stream.stream_id, std::move(pending_stream.headers),
+                 std::move(pending_stream.data_source),
+                 pending_stream.user_data);
+    pending_streams_.pop_front();
+  }
+}
+
+bool OgHttp2Session::CanCreateStream() const {
+  return stream_map_.size() < max_outbound_concurrent_streams_;
 }
 
 }  // namespace adapter
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index cd9bbda..56f0621 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -167,6 +167,7 @@
 
  private:
   using MetadataSequence = std::vector<std::unique_ptr<MetadataSource>>;
+
   struct QUICHE_EXPORT_PRIVATE StreamState {
     StreamState(int32_t stream_receive_window,
                 WindowManager::WindowUpdateListener listener)
@@ -183,6 +184,13 @@
   };
   using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>;
 
+  struct QUICHE_EXPORT_PRIVATE PendingStreamState {
+    Http2StreamId stream_id;
+    spdy::SpdyHeaderBlock headers;
+    std::unique_ptr<DataFrameSource> data_source;
+    void* user_data = nullptr;
+  };
+
   class QUICHE_EXPORT_PRIVATE PassthroughHeadersHandler
       : public spdy::SpdyHeadersHandlerInterface {
    public:
@@ -246,6 +254,9 @@
   // Closes the given `stream_id` with the given `error_code`.
   void CloseStream(Http2StreamId stream_id, Http2ErrorCode error_code);
 
+  // Returns true if the session can create a new stream.
+  bool CanCreateStream() const;
+
   // Receives events when inbound frames are parsed.
   Http2VisitorInterface& visitor_;
 
@@ -255,9 +266,14 @@
   // Decodes inbound frames.
   http2::Http2DecoderAdapter decoder_;
 
-  // Maintains the state of all streams known to this session.
+  // Maintains the state of active streams known to this session.
   StreamStateMap stream_map_;
 
+  // Maintains the state of pending streams known to this session. A pending
+  // stream is kept in this list until it can be created while complying with
+  // `max_outbound_concurrent_streams_`.
+  std::list<PendingStreamState> pending_streams_;
+
   // Maintains the queue of outbound frames, and any serialized bytes that have
   // not yet been consumed.
   std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_;
@@ -288,6 +304,8 @@
   // The initial flow control receive window size for any newly created streams.
   int32_t stream_receive_window_limit_ = kInitialFlowControlWindowSize;
   uint32_t max_frame_payload_ = 16384u;
+  // The spec encourages a value of at least 100 concurrent streams.
+  uint32_t max_outbound_concurrent_streams_ = 100u;
   Options options_;
   bool received_goaway_ = false;
   bool queued_preface_ = false;