Tracks streams with ready metadata or trailers separately from write_scheduler_.

PiperOrigin-RevId: 415351912
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 6235d34..3b22a92 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1756,6 +1756,196 @@
   EXPECT_FALSE(adapter->want_write());
 }
 
+TEST(OgHttp2AdapterClientTest, ClientEncountersFlowControlBlock) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  testing::InSequence s;
+
+  const std::vector<const Header> headers1 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}});
+
+  const std::string kBody = std::string(100 * 1024, 'a');
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body1->AppendPayload(kBody);
+  body1->EndData();
+
+  const int32_t stream_id1 =
+      adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+  ASSERT_GT(stream_id1, 0);
+
+  const std::vector<const Header> headers2 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/two"}});
+
+  auto body2 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body2->AppendPayload(kBody);
+  body2->EndData();
+
+  const int32_t stream_id2 =
+      adapter->SubmitRequest(headers2, std::move(body2), nullptr);
+  ASSERT_GT(stream_id2, 0);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
+  // 4 DATA frames should saturate the default 64kB stream/connection flow
+  // control window.
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+  const std::string stream_frames = TestFrameSequence()
+                                        .ServerPreface()
+                                        .WindowUpdate(0, 80000)
+                                        .WindowUpdate(stream_id1, 20000)
+                                        .Serialize();
+
+  // Server preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(0, 80000));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(1, 20000));
+
+  const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+  EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0))
+      .Times(testing::AtLeast(1));
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0))
+      .Times(testing::AtLeast(1));
+
+  EXPECT_TRUE(adapter->want_write());
+  result = adapter->Send();
+  EXPECT_EQ(0, result);
+}
+
+TEST(OgHttp2AdapterClientTest, ClientSendsTrailersAfterFlowControlBlock) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  testing::InSequence s;
+
+  const std::vector<const Header> headers1 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}});
+
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body1->AppendPayload("Really small body.");
+  body1->EndData();
+
+  const int32_t stream_id1 =
+      adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+  ASSERT_GT(stream_id1, 0);
+
+  const std::vector<const Header> headers2 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/two"}});
+
+  const std::string kBody = std::string(100 * 1024, 'a');
+  auto body2 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body2->AppendPayload(kBody);
+  body2->EndData();
+
+  const int32_t stream_id2 =
+      adapter->SubmitRequest(headers2, std::move(body2), nullptr);
+  ASSERT_GT(stream_id2, 0);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id2, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(1);
+  // 4 DATA frames should saturate the default 64kB stream/connection flow
+  // control window.
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id2, _, 0x0, 0)).Times(4);
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_FALSE(adapter->want_write());
+  EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+  const std::vector<const Header> trailers1 =
+      ToHeaders({{"extra-info", "Trailers are weird but good?"}});
+  adapter->SubmitTrailer(stream_id1, trailers1);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+  result = adapter->Send();
+  EXPECT_EQ(0, result);
+}
+
+TEST(OgHttp2AdapterClientTest, ClientSendsMetadataAfterFlowControlBlock) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  testing::InSequence s;
+
+  const std::vector<const Header> headers1 =
+      ToHeaders({{":method", "GET"},
+                 {":scheme", "http"},
+                 {":authority", "example.com"},
+                 {":path", "/this/is/request/one"}});
+
+  const std::string kBody = std::string(100 * 1024, 'a');
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body1->AppendPayload(kBody);
+  body1->EndData();
+
+  const int32_t stream_id1 =
+      adapter->SubmitRequest(headers1, std::move(body1), nullptr);
+  ASSERT_GT(stream_id1, 0);
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x4, 0));
+  // 4 DATA frames should saturate the default 64kB stream/connection flow
+  // control window.
+  EXPECT_CALL(visitor, OnFrameSent(DATA, stream_id1, _, 0x0, 0)).Times(4);
+
+  int result = adapter->Send();
+  EXPECT_EQ(0, result);
+  EXPECT_FALSE(adapter->want_write());
+  EXPECT_EQ(0, adapter->GetSendWindowSize());
+
+  auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
+      {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
+  adapter->SubmitMetadata(1, 16384u, std::move(source));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(kMetadataFrameType, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(kMetadataFrameType, 1, _, 0x4, 0));
+
+  result = adapter->Send();
+  EXPECT_EQ(0, result);
+}
+
 TEST_F(OgHttp2AdapterTest, SubmitMetadata) {
   auto source = absl::make_unique<TestMetadataSource>(ToHeaderBlock(ToHeaders(
       {{"query-cost", "is too darn high"}, {"secret-sauce", "hollandaise"}})));
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 82d9d87..aaf97d7 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -482,9 +482,8 @@
     continue_writing = SendMetadata(0, connection_metadata_);
   }
   // Wake streams for writes.
-  while (continue_writing == SendResult::SEND_OK &&
-         write_scheduler_.HasReadyStreams() && connection_send_window_ > 0) {
-    const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
+  while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
+    const Http2StreamId stream_id = GetNextReadyStream();
     // TODO(birenroy): Add a return value to indicate write blockage, so streams
     // aren't woken unnecessarily.
     QUICHE_VLOG(1) << "Waking stream " << stream_id << " for writes.";
@@ -496,6 +495,28 @@
   return continue_writing == SendResult::SEND_ERROR ? -1 : 0;
 }
 
+bool OgHttp2Session::HasReadyStream() const {
+  return !metadata_ready_.empty() || !trailers_ready_.empty() ||
+         (write_scheduler_.HasReadyStreams() && connection_send_window_ > 0);
+}
+
+Http2StreamId OgHttp2Session::GetNextReadyStream() {
+  QUICHE_DCHECK(HasReadyStream());
+  if (!metadata_ready_.empty()) {
+    const Http2StreamId stream_id = *metadata_ready_.begin();
+    // WriteForStream() will re-mark the stream as ready, if necessary.
+    write_scheduler_.MarkStreamNotReady(stream_id);
+    return stream_id;
+  }
+  if (!trailers_ready_.empty()) {
+    const Http2StreamId stream_id = *trailers_ready_.begin();
+    // WriteForStream() will re-mark the stream as ready, if necessary.
+    write_scheduler_.MarkStreamNotReady(stream_id);
+    return stream_id;
+  }
+  return write_scheduler_.PopNextReadyStream();
+}
+
 OgHttp2Session::SendResult OgHttp2Session::MaybeSendBufferedData() {
   int64_t result = std::numeric_limits<int64_t>::max();
   while (result > 0 && !buffered_data_.empty()) {
@@ -692,7 +713,8 @@
   if (connection_can_write != SendResult::SEND_OK) {
     return connection_can_write;
   }
-  return available_window <= 0 ? SendResult::SEND_BLOCKED : SendResult::SEND_OK;
+  return connection_send_window_ <= 0 ? SendResult::SEND_BLOCKED
+                                      : SendResult::SEND_OK;
 }
 
 OgHttp2Session::SendResult OgHttp2Session::SendMetadata(
@@ -719,6 +741,7 @@
         std::string(payload)));
     if (end_metadata) {
       sequence.erase(sequence.begin());
+      metadata_ready_.erase(stream_id);
     }
   }
   return SendQueuedFrames();
@@ -789,7 +812,7 @@
     state.trailers =
         absl::make_unique<spdy::SpdyHeaderBlock>(ToHeaderBlock(trailers));
     if (!options_.trailers_require_end_data || !iter->second.data_deferred) {
-      write_scheduler_.MarkStreamReady(stream_id, false);
+      trailers_ready_.insert(stream_id);
     }
   }
   return 0;
@@ -802,7 +825,7 @@
   } else {
     auto iter = CreateStream(stream_id);
     iter->second.outbound_metadata.push_back(std::move(source));
-    write_scheduler_.MarkStreamReady(stream_id, false);
+    metadata_ready_.insert(stream_id);
   }
 }
 
@@ -1089,6 +1112,7 @@
     } else {
       if (it->second.send_window == 0) {
         // The stream was blocked on flow control.
+        QUICHE_VLOG(1) << "Marking stream " << stream_id << " ready to write.";
         write_scheduler_.MarkStreamReady(stream_id, false);
       }
       it->second.send_window += delta_window_size;
@@ -1277,6 +1301,7 @@
       absl::make_unique<spdy::SpdyHeadersIR>(stream_id, std::move(trailers));
   frame->set_fin(true);
   EnqueueFrame(std::move(frame));
+  trailers_ready_.erase(stream_id);
 }
 
 void OgHttp2Session::MaybeFinWithRstStream(StreamStateMap::iterator iter) {
@@ -1341,6 +1366,8 @@
                                  Http2ErrorCode error_code) {
   visitor_.OnCloseStream(stream_id, error_code);
   stream_map_.erase(stream_id);
+  trailers_ready_.erase(stream_id);
+  metadata_ready_.erase(stream_id);
   if (write_scheduler_.StreamRegistered(stream_id)) {
     write_scheduler_.UnregisterStream(stream_id);
   }
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index fd7bdc2..e4a2374 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -125,7 +125,7 @@
   }
   bool want_write() const override {
     return !frames_.empty() || !buffered_data_.empty() ||
-           write_scheduler_.HasReadyStreams() || !connection_metadata_.empty();
+           !connection_metadata_.empty() || HasReadyStream();
   }
   int GetRemoteWindowSize() const override { return connection_send_window_; }
 
@@ -285,6 +285,13 @@
     SEND_ERROR,
   };
 
+  // Returns true if at least one stream has data or control frames to write.
+  bool HasReadyStream() const;
+
+  // Returns the next stream that has something to write. If there are no such
+  // streams, returns zero.
+  Http2StreamId GetNextReadyStream();
+
   // Sends the buffered connection preface or serialized frame data, if any.
   SendResult MaybeSendBufferedData();
 
@@ -391,7 +398,13 @@
   WindowManager connection_window_manager_;
 
   absl::flat_hash_set<Http2StreamId> streams_reset_;
+
+  // The number of frames currently queued per stream.
   absl::flat_hash_map<Http2StreamId, int> queued_frames_;
+  // Includes streams that are currently ready to write trailers.
+  absl::flat_hash_set<Http2StreamId> trailers_ready_;
+  // Includes streams that are currently ready to write metadata.
+  absl::flat_hash_set<Http2StreamId> metadata_ready_;
 
   MetadataSequence connection_metadata_;