Implements read block detection in OgHttp2Session, and fixes read block handling in Nghttp2DataFrameSource.

Note that currently, there is no way to reschedule a stream that encounters a read block in the OgHttp2 stack. (Coming in a future change.)

PiperOrigin-RevId: 378512810
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index ded57cc..65a0c7e 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -24,6 +24,24 @@
   WINDOW_UPDATE,
 };
 
+// This send callback assumes |source|'s pointer is a TestDataSource, and
+// |user_data| is a Http2VisitorInterface.
+int TestSendCallback(nghttp2_session*,
+                     nghttp2_frame* frame,
+                     const uint8_t* framehd,
+                     size_t length,
+                     nghttp2_data_source* source,
+                     void* user_data) {
+  auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
+  // Send the frame header via the visitor.
+  visitor->OnReadyToSend(ToStringView(framehd, 9));
+  auto* test_source = static_cast<TestDataSource*>(source->ptr);
+  absl::string_view payload = test_source->ReadNext(length);
+  // Send the frame payload via the visitor.
+  visitor->OnReadyToSend(payload);
+  return 0;
+}
+
 TEST(NgHttp2AdapterTest, ClientConstruction) {
   testing::StrictMock<MockHttp2Visitor> visitor;
   auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
@@ -269,22 +287,8 @@
   TestDataSource body1{kBody};
   // The TestDataSource is wrapped in the nghttp2_data_provider data type.
   nghttp2_data_provider provider = body1.MakeDataProvider();
+  nghttp2_send_data_callback send_callback = &TestSendCallback;
 
-  // This send callback assumes |source|'s pointer is a TestDataSource, which we
-  // know is true because we just converted |body1| into a nghttp2_data_provider
-  // above.
-  nghttp2_send_data_callback send_callback =
-      [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd,
-         size_t length, nghttp2_data_source* source, void* user_data) {
-        auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
-        // Send the frame header via the visitor.
-        visitor->OnReadyToSend(ToStringView(framehd, 9));
-        auto* test_source = static_cast<TestDataSource*>(source->ptr);
-        absl::string_view payload = test_source->ReadNext(length);
-        // Send the frame payload via the visitor.
-        visitor->OnReadyToSend(payload);
-        return 0;
-      };
   // This call transforms it back into a DataFrameSource, which is compatible
   // with the Http2Adapter API.
   std::unique_ptr<DataFrameSource> frame_source =
@@ -304,6 +308,53 @@
   EXPECT_FALSE(adapter->session().want_write());
 }
 
+// This test verifies how nghttp2 behaves when a data source becomes
+// read-blocked.
+TEST(NgHttp2AdapterTest, ClientSubmitRequestWithDataProviderAndReadBlock) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  const absl::string_view kBody = "This is an example request body.";
+  // This test will use TestDataSource as the source of the body payload data.
+  TestDataSource body1{kBody};
+  body1.set_is_data_available(false);
+  // The TestDataSource is wrapped in the nghttp2_data_provider data type.
+  nghttp2_data_provider provider = body1.MakeDataProvider();
+  nghttp2_send_data_callback send_callback = &TestSendCallback;
+
+  // This call transforms it back into a DataFrameSource, which is compatible
+  // with the Http2Adapter API.
+  std::unique_ptr<DataFrameSource> frame_source =
+      MakeZeroCopyDataFrameSource(provider, &visitor, std::move(send_callback));
+  int stream_id =
+      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
+                                        {":scheme", "http"},
+                                        {":authority", "example.com"},
+                                        {":path", "/this/is/request/one"}}),
+                             frame_source.get(), nullptr);
+  EXPECT_GT(stream_id, 0);
+  EXPECT_TRUE(adapter->session().want_write());
+
+  adapter->Send();
+  // Client preface does not appear to include the mandatory SETTINGS frame.
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(serialized, EqualsFrames({spdy::SpdyFrameType::HEADERS}));
+  visitor.Clear();
+  EXPECT_FALSE(adapter->session().want_write());
+
+  // Resume the deferred stream.
+  body1.set_is_data_available(true);
+  nghttp2_session_resume_data(adapter->session().raw_ptr(), stream_id);
+  EXPECT_TRUE(adapter->session().want_write());
+
+  adapter->Send();
+  EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::DATA}));
+  EXPECT_FALSE(adapter->session().want_write());
+}
+
 TEST(NgHttp2AdapterTest, ServerConstruction) {
   testing::StrictMock<MockHttp2Visitor> visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/nghttp2_data_provider.cc b/http2/adapter/nghttp2_data_provider.cc
index 7a70965..83037e4 100644
--- a/http2/adapter/nghttp2_data_provider.cc
+++ b/http2/adapter/nghttp2_data_provider.cc
@@ -18,6 +18,7 @@
                                     uint32_t* data_flags,
                                     nghttp2_data_source* source,
                                     void* /* user_data */) {
+  *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
   auto* frame_source = static_cast<DataFrameSource*>(source->ptr);
   auto [result_length, done] = frame_source->SelectPayloadLength(length);
   if (result_length == DataFrameSource::kBlocked) {
@@ -25,7 +26,6 @@
   } else if (result_length == DataFrameSource::kError) {
     return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
   }
-  *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
   if (done) {
     *data_flags |= NGHTTP2_DATA_FLAG_EOF;
   }
diff --git a/http2/adapter/nghttp2_util.cc b/http2/adapter/nghttp2_util.cc
index d97c64a..812bbec 100644
--- a/http2/adapter/nghttp2_util.cc
+++ b/http2/adapter/nghttp2_util.cc
@@ -133,11 +133,13 @@
     ssize_t result = provider_.read_callback(
         nullptr /* session */, stream_id, nullptr /* buf */, max_length,
         &data_flags, &provider_.source, nullptr /* user_data */);
-    if (result < 0) {
-      return {-1, false};
+    if (result == NGHTTP2_ERR_DEFERRED) {
+      return {kBlocked, false};
+    } else if (result < 0) {
+      return {kError, false};
     } else if ((data_flags & NGHTTP2_DATA_FLAG_NO_COPY) == 0) {
       QUICHE_LOG(ERROR) << "Source did not use the zero-copy API!";
-      return {-1, false};
+      return {kError, false};
     } else {
       if (data_flags & NGHTTP2_DATA_FLAG_NO_END_STREAM) {
         send_fin_ = false;
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 330476c..8c10656 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -127,7 +127,7 @@
     const Http2StreamId stream_id = write_scheduler_.PopNextReadyStream();
     // TODO(birenroy): Add a return value to indicate write blockage, so streams
     // aren't woken unnecessarily.
-    WriteForStream(stream_id);
+    continue_writing = WriteForStream(stream_id);
   }
   if (continue_writing) {
     SendQueuedFrames();
@@ -152,12 +152,12 @@
   return true;
 }
 
-void OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
+bool OgHttp2Session::WriteForStream(Http2StreamId stream_id) {
   auto it = stream_map_.find(stream_id);
   if (it == stream_map_.end()) {
     QUICHE_LOG(ERROR) << "Can't find stream " << stream_id
                       << " which is ready to write!";
-    return;
+    return true;
   }
   StreamState& state = it->second;
   if (state.outbound_body == nullptr) {
@@ -171,13 +171,22 @@
         MaybeCloseWithRstStream(stream_id, state);
       }
     }
-    return;
+    return true;
   }
+  bool source_can_produce = true;
   int32_t available_window =
       std::min(std::min(peer_window_, state.send_window), max_frame_payload_);
   while (available_window > 0 && state.outbound_body != nullptr) {
     auto [length, end_data] =
         state.outbound_body->SelectPayloadLength(available_window);
+    if (length == DataFrameSource::kBlocked) {
+      source_can_produce = false;
+      break;
+    } else if (length == DataFrameSource::kError) {
+      source_can_produce = false;
+      visitor_.OnCloseStream(stream_id, Http2ErrorCode::INTERNAL_ERROR);
+      break;
+    }
     const bool fin = end_data ? state.outbound_body->send_fin() : false;
     spdy::SpdyDataIR data(stream_id);
     data.set_fin(fin);
@@ -208,9 +217,11 @@
   }
   // If the stream still has data to send, it should be marked as ready in the
   // write scheduler.
-  if (state.send_window > 0 && state.outbound_body != nullptr) {
+  if (source_can_produce && state.send_window > 0 &&
+      state.outbound_body != nullptr) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
+  return available_window > 0;
 }
 
 int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index 30bc9d8..c5b9efc 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -160,7 +160,10 @@
   // Sends queued frames, returning true if all frames were flushed.
   bool SendQueuedFrames();
 
-  void WriteForStream(Http2StreamId stream_id);
+  // Returns false if the connection is write-blocked (due to flow control or
+  // some other reason).
+  bool WriteForStream(Http2StreamId stream_id);
+
   void SendTrailers(Http2StreamId stream_id, spdy::SpdyHeaderBlock trailers);
 
   // Encapsulates the RST_STREAM NO_ERROR behavior described in RFC 7540
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 185fd3a..636fda1 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -221,6 +221,41 @@
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
 }
 
+// This test exercises the case where the client request body source is read
+// blocked.
+TEST(OgHttp2SessionTest, ClientSubmitRequestWithReadBlock) {
+  DataSavingVisitor visitor;
+  OgHttp2Session session(
+      visitor, OgHttp2Session::Options{.perspective = Perspective::kClient});
+  EXPECT_FALSE(session.want_write());
+
+  const char* kSentinel1 = "arbitrary pointer 1";
+  TestDataFrameSource body1(visitor, "This is an example request body.");
+  body1.set_is_data_available(false);
+  int stream_id =
+      session.SubmitRequest(ToHeaders({{":method", "POST"},
+                                       {":scheme", "http"},
+                                       {":authority", "example.com"},
+                                       {":path", "/this/is/request/one"}}),
+                            &body1, const_cast<char*>(kSentinel1));
+  EXPECT_GT(stream_id, 0);
+  EXPECT_TRUE(session.want_write());
+  EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
+  session.Send();
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(serialized,
+              EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS}));
+  // No data frame, as body1 was read blocked.
+  visitor.Clear();
+  EXPECT_FALSE(session.want_write());
+
+  // Currently there is no way to indicate that the first stream is no longer
+  // read blocked.
+}
+
 TEST(OgHttp2SessionTest, ClientStartShutdown) {
   DataSavingVisitor visitor;
   OgHttp2Session session(
diff --git a/http2/adapter/test_utils.cc b/http2/adapter/test_utils.cc
index 3ccd63e..c6e98a2 100644
--- a/http2/adapter/test_utils.cc
+++ b/http2/adapter/test_utils.cc
@@ -36,13 +36,16 @@
 
 std::pair<ssize_t, bool> TestDataFrameSource::SelectPayloadLength(
     size_t max_length) {
+  if (!is_data_available_) {
+    return {kBlocked, false};
+  }
   // The stream is done if there's no more data, or if |max_length| is at least
   // as large as the remaining data.
   const bool end_data =
       current_fragment_.empty() || (payload_fragments_.size() == 1 &&
                                     max_length >= current_fragment_.size());
   const ssize_t length = std::min(max_length, current_fragment_.size());
-  return std::make_pair(length, end_data);
+  return {length, end_data};
 }
 
 void TestDataFrameSource::Send(absl::string_view frame_header,
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index 0ea7ec4..de001a1 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -50,11 +50,14 @@
   void Send(absl::string_view frame_header, size_t payload_length) override;
   bool send_fin() const override { return has_fin_; }
 
+  void set_is_data_available(bool value) { is_data_available_ = value; }
+
  private:
   Http2VisitorInterface& visitor_;
   std::vector<std::string> payload_fragments_;
   absl::string_view current_fragment_;
   const bool has_fin_;
+  bool is_data_available_ = true;
 };
 
 // A simple class that can easily be adapted to act as a nghttp2_data_source.
@@ -78,20 +81,27 @@
         .source = {.ptr = this},
         .read_callback = [](nghttp2_session*, int32_t, uint8_t*, size_t length,
                             uint32_t* data_flags, nghttp2_data_source* source,
-                            void*) {
+                            void*) -> ssize_t {
+          *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
           auto* s = static_cast<TestDataSource*>(source->ptr);
+          if (!s->is_data_available()) {
+            return NGHTTP2_ERR_DEFERRED;
+          }
           const ssize_t ret = s->SelectPayloadLength(length);
           if (ret < length) {
             *data_flags |= NGHTTP2_DATA_FLAG_EOF;
           }
-          *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY;
           return ret;
         }};
   }
 
+  bool is_data_available() const { return is_data_available_; }
+  void set_is_data_available(bool value) { is_data_available_ = value; }
+
  private:
   const std::string data_;
   absl::string_view remaining_ = data_;
+  bool is_data_available_ = true;
 };
 
 // These matchers check whether a string consists entirely of HTTP/2 frames of