Adds a StreamState member that tracks whether the data source is currently blocked.

This affects whether the stream should be scheduled for writes, and therefore the return value of OgHttp2Adapter::want_write().

PiperOrigin-RevId: 401817911
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index ccfb1ac..62f3682 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -1520,6 +1520,117 @@
                                             spdy::SpdyFrameType::PING}));
 }
 
+// Tests the case where the response body is in the progress of being sent while
+// trailers are queued.
+TEST(NgHttp2AdapterTest, ServerSubmitsTrailersWhileDataDeferred) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/false)
+                                 .WindowUpdate(1, 2000)
+                                 .Data(1, "This is the request body.")
+                                 .WindowUpdate(0, 2000)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/this/is/request/one"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+  EXPECT_CALL(visitor, OnDataForStream(1, "This is the request body."));
+  EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  const absl::string_view kBody = "This is an example response body.";
+
+  // The body source must indicate that the end of the body is not the end of
+  // the stream.
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body1->AppendPayload(kBody);
+  auto* body1_ptr = body1.get();
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
+      std::move(body1));
+  EXPECT_EQ(submit_result, 0);
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+
+  send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
+                                            spdy::SpdyFrameType::DATA}));
+  visitor.Clear();
+  EXPECT_FALSE(adapter->want_write());
+
+  int trailer_result =
+      adapter->SubmitTrailer(1, ToHeaders({{"final-status", "a-ok"}}));
+  ASSERT_EQ(trailer_result, 0);
+
+  // Even though the data source has not finished sending data, nghttp2 will
+  // write the trailers anyway.
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x5, 0));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
+  visitor.Clear();
+
+  // Resuming the stream results in the library wanting to write again.
+  body1_ptr->AppendPayload(kBody);
+  body1_ptr->EndData();
+  adapter->ResumeStream(1);
+  EXPECT_TRUE(adapter->want_write());
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+
+  // But no data is written for the stream.
+  EXPECT_THAT(visitor.data(), testing::IsEmpty());
+  EXPECT_FALSE(adapter->want_write());
+}
+
 TEST(NgHttp2AdapterTest, ServerErrorWhileHandlingHeaders) {
   DataSavingVisitor visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 413db77..4731e48 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1165,6 +1165,112 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
 }
 
+// Tests the case where the response body is in the progress of being sent while
+// trailers are queued.
+TEST(OgHttp2AdapterServerTest, ServerSubmitsTrailersWhileDataDeferred) {
+  DataSavingVisitor visitor;
+  OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
+  auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/this/is/request/one"}},
+                                          /*fin=*/false)
+                                 .WindowUpdate(1, 2000)
+                                 .Data(1, "This is the request body.")
+                                 .WindowUpdate(0, 2000)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/this/is/request/one"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+  EXPECT_CALL(visitor, OnFrameHeader(1, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(1, 2000));
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+  EXPECT_CALL(visitor, OnDataForStream(1, "This is the request body."));
+  EXPECT_CALL(visitor, OnFrameHeader(0, 4, WINDOW_UPDATE, 0));
+  EXPECT_CALL(visitor, OnWindowUpdate(0, 2000));
+
+  const int64_t result = adapter->ProcessBytes(frames);
+  EXPECT_EQ(frames.size(), static_cast<size_t>(result));
+
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+  int send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::SETTINGS,
+                                            spdy::SpdyFrameType::SETTINGS}));
+  visitor.Clear();
+
+  const absl::string_view kBody = "This is an example response body.";
+
+  // The body source must indicate that the end of the body is not the end of
+  // the stream.
+  auto body1 = absl::make_unique<TestDataFrameSource>(visitor, false);
+  body1->AppendPayload(kBody);
+  auto* body1_ptr = body1.get();
+  int submit_result = adapter->SubmitResponse(
+      1, ToHeaders({{":status", "200"}, {"x-comment", "Sure, sounds good."}}),
+      std::move(body1));
+  EXPECT_EQ(submit_result, 0);
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x4));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x4, 0));
+  EXPECT_CALL(visitor, OnFrameSent(DATA, 1, _, 0x0, 0));
+
+  send_result = adapter->Send();
+  // Some bytes should have been serialized.
+  EXPECT_EQ(0, send_result);
+  // SETTINGS ack
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS,
+                                            spdy::SpdyFrameType::DATA}));
+  visitor.Clear();
+  EXPECT_FALSE(adapter->want_write());
+
+  int trailer_result =
+      adapter->SubmitTrailer(1, ToHeaders({{"final-status", "a-ok"}}));
+  ASSERT_EQ(trailer_result, 0);
+  // Even though there are new trailers to write, the data source has not
+  // finished writing data and is blocked.
+  EXPECT_FALSE(adapter->want_write());
+
+  body1_ptr->EndData();
+  adapter->ResumeStream(1);
+  EXPECT_TRUE(adapter->want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, 1, _, 0x5));
+  EXPECT_CALL(visitor, OnFrameSent(HEADERS, 1, _, 0x5, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(RST_STREAM, 1, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::NO_ERROR));
+
+  send_result = adapter->Send();
+  EXPECT_EQ(0, send_result);
+}
+
 TEST(OgHttp2AdapterServerTest, ServerErrorWhileHandlingHeaders) {
   DataSavingVisitor visitor;
   OgHttp2Adapter::Options options{.perspective = Perspective::kServer};
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index e3a696b..09a3ce0 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -216,10 +216,11 @@
 
 bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
   auto it = stream_map_.find(stream_id);
-  if (it->second.outbound_body == nullptr ||
+  if (it == stream_map_.end() || it->second.outbound_body == nullptr ||
       !write_scheduler_.StreamRegistered(stream_id)) {
     return false;
   }
+  it->second.data_deferred = false;
   write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
   return true;
 }
@@ -432,7 +433,6 @@
     }
     return true;
   }
-  bool source_can_produce = true;
   int32_t available_window =
       std::min({connection_send_window_, state.send_window,
                 static_cast<int32_t>(max_frame_payload_)});
@@ -443,10 +443,9 @@
     std::tie(length, end_data) =
         state.outbound_body->SelectPayloadLength(available_window);
     if (length == 0 && !end_data) {
-      source_can_produce = false;
+      state.data_deferred = true;
       break;
     } else if (length == DataFrameSource::kError) {
-      source_can_produce = false;
       CloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
       // No more work on the stream; it has been closed.
       break;
@@ -494,7 +493,7 @@
   }
   // If the stream still exists and has data to send, it should be marked as
   // ready in the write scheduler.
-  if (stream_map_.contains(stream_id) && source_can_produce &&
+  if (stream_map_.contains(stream_id) && !state.data_deferred &&
       state.send_window > 0 && state.outbound_body != nullptr) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
@@ -601,7 +600,9 @@
     // Save trailers so they can be written once data is done.
     state.trailers =
         absl::make_unique<spdy::SpdyHeaderBlock>(ToHeaderBlock(trailers));
-    write_scheduler_.MarkStreamReady(stream_id, false);
+    if (!iter->second.data_deferred) {
+      write_scheduler_.MarkStreamReady(stream_id, false);
+    }
   }
   return 0;
 }
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 75f9c96..814b843 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -185,6 +185,8 @@
     absl::optional<HeaderType> received_header_type;
     bool half_closed_local = false;
     bool half_closed_remote = false;
+    // Indicates that `outbound_body` temporarily cannot produce data.
+    bool data_deferred = false;
   };
   using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>;