Ensures proper queuing of requests when too many are in flight.

An Envoy integration test demonstrated that in particular circumstances, the oghttp2 library dequeues requests in the incorrect order, leading to interop problems when stream IDs do not monotonically increase.

I verified that the included test case fails at HEAD, and succeeds with this change.

Before:
http://sponge2/fb9f0a8c-b2d9-4391-acf8-2f7e85abc372

After:
http://sponge2/99574d9f-63f0-45e5-b1af-880bc055280d
PiperOrigin-RevId: 433825658
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 55b1bbf..0294344 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -2066,6 +2066,88 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+TEST(NgHttp2AdapterTest, ClientQueuesRequests) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  testing::InSequence s;
+
+  adapter->SubmitSettings({});
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  adapter->Send();
+
+  const std::string initial_frames =
+      TestFrameSequence()
+          .ServerPreface({{MAX_CONCURRENT_STREAMS, 2}})
+          .SettingsAck()
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0x0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSetting(Http2Setting{
+                           Http2KnownSettingsId::MAX_CONCURRENT_STREAMS, 2u}));
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0x1));
+  EXPECT_CALL(visitor, OnSettingsAck());
+
+  adapter->ProcessBytes(initial_frames);
+
+  const std::vector<Header> headers =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/example/request"}});
+  std::vector<int32_t> stream_ids;
+  // Start two, which hits the limit.
+  int32_t stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  // Start two more, which must be queued.
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[0], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[0], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[1], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[1], _, 0x5, 0));
+
+  adapter->Send();
+
+  const std::string update_streams =
+      TestFrameSequence().Settings({{MAX_CONCURRENT_STREAMS, 5}}).Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0x0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSetting(Http2Setting{
+                           Http2KnownSettingsId::MAX_CONCURRENT_STREAMS, 5u}));
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  adapter->ProcessBytes(update_streams);
+
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[2], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[2], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[3], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[3], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[4], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[4], _, 0x5, 0));
+  // Header frames should all have been sent in order, regardless of any
+  // queuing.
+
+  adapter->Send();
+}
+
 TEST(NgHttp2AdapterTest, SubmitMetadata) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 7fbf6e4..b7fa20c 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -2900,6 +2900,89 @@
   EXPECT_EQ(0, result);
 }
 
+TEST(OgHttp2AdapterTest, ClientQueuesRequests) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  testing::InSequence s;
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+
+  adapter->Send();
+
+  const std::string initial_frames =
+      TestFrameSequence()
+          .ServerPreface({{MAX_CONCURRENT_STREAMS, 2}})
+          .SettingsAck()
+          .Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0x0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSetting(Http2Setting{
+                           Http2KnownSettingsId::MAX_CONCURRENT_STREAMS, 2u}))
+      .Times(2);
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0x1));
+  EXPECT_CALL(visitor, OnSettingsAck());
+
+  adapter->ProcessBytes(initial_frames);
+
+  const std::vector<Header> headers =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/example/request"}});
+  std::vector<int32_t> stream_ids;
+  // Start two, which hits the limit.
+  int32_t stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  // Start two more, which must be queued.
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[0], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[0], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[1], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[1], _, 0x5, 0));
+
+  adapter->Send();
+
+  const std::string update_streams =
+      TestFrameSequence().Settings({{MAX_CONCURRENT_STREAMS, 5}}).Serialize();
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0x0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSetting(Http2Setting{
+                           Http2KnownSettingsId::MAX_CONCURRENT_STREAMS, 5u}))
+      .Times(2);
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  adapter->ProcessBytes(update_streams);
+  stream_id = adapter->SubmitRequest(headers, nullptr, nullptr);
+  stream_ids.push_back(stream_id);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[2], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[2], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[3], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[3], _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_ids[4], _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_ids[4], _, 0x5, 0));
+  // Header frames should all have been sent in order, regardless of any
+  // queuing.
+
+  adapter->Send();
+}
+
 TEST(OgHttp2AdapterTest, SubmitMetadata) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 09ad924..9a120c3 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -371,6 +371,10 @@
   if (it != stream_map_.end()) {
     return it->second.user_data;
   }
+  auto p = pending_streams_.find(stream_id);
+  if (p != pending_streams_.end()) {
+    return p->second.user_data;
+  }
   return nullptr;
 }
 
@@ -918,14 +922,16 @@
   // TODO(birenroy): return an error for the incorrect perspective
   const Http2StreamId stream_id = next_stream_id_;
   next_stream_id_ += 2;
-  if (CanCreateStream()) {
-    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
-                 user_data);
-  } else {
+  if (!pending_streams_.empty() || !CanCreateStream()) {
     // TODO(diannahu): There should probably be a limit to the number of allowed
     // pending streams.
-    pending_streams_.push_back(
-        {stream_id, ToHeaderBlock(headers), std::move(data_source), user_data});
+    pending_streams_.insert(
+        {stream_id, PendingStreamState{ToHeaderBlock(headers),
+                                       std::move(data_source), user_data}});
+    StartPendingStreams();
+  } else {
+    StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
+                 user_data);
   }
   return stream_id;
 }
@@ -1677,6 +1683,16 @@
   SendHeaders(stream_id, std::move(headers), end_stream);
 }
 
+void OgHttp2Session::StartPendingStreams() {
+  while (!pending_streams_.empty() && CanCreateStream()) {
+    auto& [stream_id, pending_stream] = pending_streams_.front();
+    StartRequest(stream_id, std::move(pending_stream.headers),
+                 std::move(pending_stream.data_source),
+                 pending_stream.user_data);
+    pending_streams_.pop_front();
+  }
+}
+
 void OgHttp2Session::CloseStream(Http2StreamId stream_id,
                                  Http2ErrorCode error_code) {
   visitor_.OnCloseStream(stream_id, error_code);
@@ -1690,13 +1706,7 @@
     write_scheduler_.UnregisterStream(stream_id);
   }
 
-  if (!pending_streams_.empty() && CanCreateStream()) {
-    PendingStreamState& pending_stream = pending_streams_.front();
-    StartRequest(pending_stream.stream_id, std::move(pending_stream.headers),
-                 std::move(pending_stream.data_source),
-                 pending_stream.user_data);
-    pending_streams_.pop_front();
-  }
+  StartPendingStreams();
 }
 
 bool OgHttp2Session::CanCreateStream() const {
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 045eac4..e3771f8 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -22,6 +22,7 @@
 #include "http2/core/priority_write_scheduler.h"
 #include "common/platform/api/quiche_bug_tracker.h"
 #include "common/platform/api/quiche_export.h"
+#include "common/quiche_linked_hash_map.h"
 #include "spdy/core/http2_frame_decoder_adapter.h"
 #include "spdy/core/no_op_headers_handler.h"
 #include "spdy/core/spdy_framer.h"
@@ -242,7 +243,6 @@
   using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>;
 
   struct QUICHE_EXPORT_PRIVATE PendingStreamState {
-    Http2StreamId stream_id;
     spdy::SpdyHeaderBlock headers;
     std::unique_ptr<DataFrameSource> data_source;
     void* user_data = nullptr;
@@ -385,6 +385,9 @@
                     std::unique_ptr<DataFrameSource> data_source,
                     void* user_data);
 
+  // Sends headers for pending streams as long as the stream limit allows.
+  void StartPendingStreams();
+
   // Closes the given `stream_id` with the given `error_code`.
   void CloseStream(Http2StreamId stream_id, Http2ErrorCode error_code);
 
@@ -445,7 +448,8 @@
   // Maintains the state of pending streams known to this session. A pending
   // stream is kept in this list until it can be created while complying with
   // `max_outbound_concurrent_streams_`.
-  std::list<PendingStreamState> pending_streams_;
+  quiche::QuicheLinkedHashMap<Http2StreamId, PendingStreamState>
+      pending_streams_;
 
   // The queue of outbound frames.
   std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_;