Adds a return value to DataFrameSource::Send(), indicating whether the send operation would encounter a write block.

PiperOrigin-RevId: 378651262
diff --git a/http2/adapter/data_source.h b/http2/adapter/data_source.h
index 5658bfd..2509358 100644
--- a/http2/adapter/data_source.h
+++ b/http2/adapter/data_source.h
@@ -23,9 +23,9 @@
   virtual std::pair<ssize_t, bool> SelectPayloadLength(size_t max_length) = 0;
 
   // This method is called with a frame header and a payload length to send. The
-  // source should send or buffer the entire frame.
-  // TODO(birenroy): Consider adding a return value to indicate write blockage.
-  virtual void Send(absl::string_view frame_header, size_t payload_length) = 0;
+  // source should send or buffer the entire frame and return true, or return
+  // false without sending or buffering anything.
+  virtual bool Send(absl::string_view frame_header, size_t payload_length) = 0;
 
   // If true, the end of this data source indicates the end of the stream.
   // Otherwise, this data will be followed by trailers.
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index 65a0c7e..d930a26 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -34,7 +34,10 @@
                      void* user_data) {
   auto* visitor = static_cast<Http2VisitorInterface*>(user_data);
   // Send the frame header via the visitor.
-  visitor->OnReadyToSend(ToStringView(framehd, 9));
+  ssize_t result = visitor->OnReadyToSend(ToStringView(framehd, 9));
+  if (result == 0) {
+    return NGHTTP2_ERR_WOULDBLOCK;
+  }
   auto* test_source = static_cast<TestDataSource*>(source->ptr);
   absl::string_view payload = test_source->ReadNext(length);
   // Send the frame payload via the visitor.
@@ -355,6 +358,50 @@
   EXPECT_FALSE(adapter->session().want_write());
 }
 
+// This test verifies how nghttp2 behaves when a connection becomes
+// write-blocked.
+TEST(NgHttp2AdapterTest, ClientSubmitRequestWithDataProviderAndWriteBlock) {
+  DataSavingVisitor visitor;
+  auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+  const absl::string_view kBody = "This is an example request body.";
+  // This test will use TestDataSource as the source of the body payload data.
+  TestDataSource body1{kBody};
+  // The TestDataSource is wrapped in the nghttp2_data_provider data type.
+  nghttp2_data_provider provider = body1.MakeDataProvider();
+  nghttp2_send_data_callback send_callback = &TestSendCallback;
+
+  // This call transforms it back into a DataFrameSource, which is compatible
+  // with the Http2Adapter API.
+  std::unique_ptr<DataFrameSource> frame_source =
+      MakeZeroCopyDataFrameSource(provider, &visitor, std::move(send_callback));
+  int stream_id =
+      adapter->SubmitRequest(ToHeaders({{":method", "POST"},
+                                        {":scheme", "http"},
+                                        {":authority", "example.com"},
+                                        {":path", "/this/is/request/one"}}),
+                             frame_source.get(), nullptr);
+  EXPECT_GT(stream_id, 0);
+  EXPECT_TRUE(adapter->session().want_write());
+
+  visitor.set_is_write_blocked(true);
+  adapter->Send();
+  EXPECT_THAT(visitor.data(), testing::IsEmpty());
+  EXPECT_TRUE(adapter->session().want_write());
+
+  visitor.set_is_write_blocked(false);
+  adapter->Send();
+
+  // Client preface does not appear to include the mandatory SETTINGS frame.
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(serialized, EqualsFrames({spdy::SpdyFrameType::HEADERS,
+                                        spdy::SpdyFrameType::DATA}));
+  EXPECT_FALSE(adapter->session().want_write());
+}
+
 TEST(NgHttp2AdapterTest, ServerConstruction) {
   testing::StrictMock<MockHttp2Visitor> visitor;
   auto adapter = NgHttp2Adapter::CreateServerAdapter(visitor);
diff --git a/http2/adapter/nghttp2_util.cc b/http2/adapter/nghttp2_util.cc
index 812bbec..f3743ca 100644
--- a/http2/adapter/nghttp2_util.cc
+++ b/http2/adapter/nghttp2_util.cc
@@ -149,10 +149,14 @@
     }
   }
 
-  void Send(absl::string_view frame_header, size_t payload_length) override {
-    send_data_(nullptr /* session */, nullptr /* frame */,
-               ToUint8Ptr(frame_header.data()), payload_length,
-               &provider_.source, user_data_);
+  bool Send(absl::string_view frame_header, size_t payload_length) override {
+    const int result =
+        send_data_(nullptr /* session */, nullptr /* frame */,
+                   ToUint8Ptr(frame_header.data()), payload_length,
+                   &provider_.source, user_data_);
+    QUICHE_LOG_IF(ERROR, result < 0 && result != NGHTTP2_ERR_WOULDBLOCK)
+        << "Unexpected error code from send: " << result;
+    return result == 0;
   }
 
   bool send_fin() const override { return send_fin_; }
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 8c10656..bb8405f 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -138,15 +138,20 @@
   // Serialize and send frames in the queue.
   while (!frames_.empty()) {
     spdy::SpdySerializedFrame frame = framer_.SerializeFrame(*frames_.front());
-    frames_.pop_front();
     const ssize_t result = visitor_.OnReadyToSend(absl::string_view(frame));
     if (result < 0) {
       visitor_.OnConnectionError();
       return false;
-    }
-    if (result < frame.size()) {
-      serialized_prefix_.assign(frame.data() + result, frame.size() - result);
+    } else if (result == 0) {
+      // Write blocked.
       return false;
+    } else {
+      frames_.pop_front();
+      if (result < frame.size()) {
+        // The frame was partially written, so the rest must be buffered.
+        serialized_prefix_.assign(frame.data() + result, frame.size() - result);
+        return false;
+      }
     }
   }
   return true;
@@ -174,6 +179,7 @@
     return true;
   }
   bool source_can_produce = true;
+  bool connection_can_write = true;
   int32_t available_window =
       std::min(std::min(peer_window_, state.send_window), max_frame_payload_);
   while (available_window > 0 && state.outbound_body != nullptr) {
@@ -193,7 +199,12 @@
     data.SetDataShallow(length);
     spdy::SpdySerializedFrame header =
         spdy::SpdyFramer::SerializeDataFrameHeaderWithPaddingLengthField(data);
-    state.outbound_body->Send(absl::string_view(header), length);
+    const bool success =
+        state.outbound_body->Send(absl::string_view(header), length);
+    if (!success) {
+      connection_can_write = false;
+      break;
+    }
     peer_window_ -= length;
     state.send_window -= length;
     available_window =
@@ -221,7 +232,9 @@
       state.outbound_body != nullptr) {
     write_scheduler_.MarkStreamReady(stream_id, false);
   }
-  return available_window > 0;
+  // Streams can continue writing as long as the connection is not write-blocked
+  // and there is additional flow control quota available.
+  return connection_can_write && available_window > 0;
 }
 
 int32_t OgHttp2Session::SubmitRequest(absl::Span<const Header> headers,
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 636fda1..4b42188 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -256,6 +256,43 @@
   // read blocked.
 }
 
+// This test exercises the case where the connection to the peer is write
+// blocked.
+TEST(OgHttp2SessionTest, ClientSubmitRequestWithWriteBlock) {
+  DataSavingVisitor visitor;
+  OgHttp2Session session(
+      visitor, OgHttp2Session::Options{.perspective = Perspective::kClient});
+  EXPECT_FALSE(session.want_write());
+
+  const char* kSentinel1 = "arbitrary pointer 1";
+  TestDataFrameSource body1(visitor, "This is an example request body.");
+  int stream_id =
+      session.SubmitRequest(ToHeaders({{":method", "POST"},
+                                       {":scheme", "http"},
+                                       {":authority", "example.com"},
+                                       {":path", "/this/is/request/one"}}),
+                            &body1, const_cast<char*>(kSentinel1));
+  EXPECT_GT(stream_id, 0);
+  EXPECT_TRUE(session.want_write());
+  EXPECT_EQ(kSentinel1, session.GetStreamUserData(stream_id));
+  visitor.set_is_write_blocked(true);
+  session.Send();
+
+  EXPECT_THAT(visitor.data(), testing::IsEmpty());
+  EXPECT_TRUE(session.want_write());
+  visitor.set_is_write_blocked(false);
+  session.Send();
+
+  absl::string_view serialized = visitor.data();
+  EXPECT_THAT(serialized,
+              testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+  serialized.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+  EXPECT_THAT(serialized,
+              EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS,
+                            SpdyFrameType::DATA}));
+  EXPECT_FALSE(session.want_write());
+}
+
 TEST(OgHttp2SessionTest, ClientStartShutdown) {
   DataSavingVisitor visitor;
   OgHttp2Session session(
diff --git a/http2/adapter/test_utils.cc b/http2/adapter/test_utils.cc
index c6e98a2..e0dfec6 100644
--- a/http2/adapter/test_utils.cc
+++ b/http2/adapter/test_utils.cc
@@ -48,7 +48,7 @@
   return {length, end_data};
 }
 
-void TestDataFrameSource::Send(absl::string_view frame_header,
+bool TestDataFrameSource::Send(absl::string_view frame_header,
                                size_t payload_length) {
   QUICHE_LOG_IF(DFATAL, payload_length > current_fragment_.size())
       << "payload_length: " << payload_length
@@ -56,13 +56,23 @@
   const std::string concatenated =
       absl::StrCat(frame_header, current_fragment_.substr(0, payload_length));
   const ssize_t result = visitor_.OnReadyToSend(concatenated);
-  if (result < concatenated.size()) {
-    QUICHE_LOG(ERROR)
+  if (result < 0) {
+    // Write encountered error.
+    visitor_.OnConnectionError();
+    current_fragment_ = {};
+    payload_fragments_.clear();
+    return false;
+  } else if (result == 0) {
+    // Write blocked.
+    return false;
+  } else if (result < concatenated.size()) {
+    // Probably need to handle this better within this test class.
+    QUICHE_LOG(DFATAL)
         << "DATA frame not fully flushed. Connection will be corrupt!";
     visitor_.OnConnectionError();
     current_fragment_ = {};
     payload_fragments_.clear();
-    return;
+    return false;
   }
   current_fragment_.remove_prefix(payload_length);
   if (current_fragment_.empty()) {
@@ -71,6 +81,7 @@
       current_fragment_ = payload_fragments_.front();
     }
   }
+  return true;
 }
 
 namespace {
diff --git a/http2/adapter/test_utils.h b/http2/adapter/test_utils.h
index de001a1..1d58669 100644
--- a/http2/adapter/test_utils.h
+++ b/http2/adapter/test_utils.h
@@ -19,6 +19,9 @@
 class DataSavingVisitor : public testing::StrictMock<MockHttp2Visitor> {
  public:
   ssize_t OnReadyToSend(absl::string_view data) override {
+    if (is_write_blocked_) {
+      return 0;
+    }
     const size_t to_accept = std::min(send_limit_, data.size());
     absl::StrAppend(&data_, data.substr(0, to_accept));
     return to_accept;
@@ -29,9 +32,13 @@
 
   void set_send_limit(size_t limit) { send_limit_ = limit; }
 
+  bool is_write_blocked() const { return is_write_blocked_; }
+  void set_is_write_blocked(bool value) { is_write_blocked_ = value; }
+
  private:
   std::string data_;
   size_t send_limit_ = std::numeric_limits<size_t>::max();
+  bool is_write_blocked_ = false;
 };
 
 // A test DataFrameSource that can be initialized with a single string payload,
@@ -47,7 +54,7 @@
                       bool has_fin = true);
 
   std::pair<ssize_t, bool> SelectPayloadLength(size_t max_length) override;
-  void Send(absl::string_view frame_header, size_t payload_length) override;
+  bool Send(absl::string_view frame_header, size_t payload_length) override;
   bool send_fin() const override { return has_fin_; }
 
   void set_is_data_available(bool value) { is_data_available_ = value; }