Adds a method to the Http2Adapter API to resume streams which were deferred due to being read-blocked.

PiperOrigin-RevId: 378697186
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index fc64c0b..2dd7d3f 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -111,6 +111,11 @@
   // not been set.
   virtual void* GetStreamUserData(Http2StreamId stream_id) = 0;
 
+  // Resumes a stream that was previously blocked (for example, due to
+  // DataFrameSource::SelectPayloadLength() returning kBlocked). Returns true if
+  // the stream was successfully resumed.
+  virtual bool ResumeStream(Http2StreamId stream_id) = 0;
+
  protected:
   // Subclasses should expose a public factory method for constructing and
   // initializing (via Initialize()) adapter instances.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 31b215c..6ec2a14 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -167,6 +167,10 @@
   return nghttp2_session_get_stream_user_data(session_->raw_ptr(), stream_id);
 }
 
+bool NgHttp2Adapter::ResumeStream(Http2StreamId stream_id) {
+  return 0 == nghttp2_session_resume_data(session_->raw_ptr(), stream_id);
+}
+
 NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor,
                                Perspective perspective)
     : Http2Adapter(visitor), visitor_(visitor), perspective_(perspective) {}
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index b8dfb3b..a3f32df 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -70,6 +70,8 @@
   void SetStreamUserData(Http2StreamId stream_id, void* user_data) override;
   void* GetStreamUserData(Http2StreamId stream_id) override;
 
+  bool ResumeStream(Http2StreamId stream_id) override;
+
   // TODO(b/181586191): Temporary accessor until equivalent functionality is
   // available in this adapter class.
   NgHttp2Session& session() { return *session_; }
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index d930a26..070ed68 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -350,12 +350,16 @@
 
   // Resume the deferred stream.
   body1.set_is_data_available(true);
-  nghttp2_session_resume_data(adapter->session().raw_ptr(), stream_id);
+  EXPECT_TRUE(adapter->ResumeStream(stream_id));
   EXPECT_TRUE(adapter->session().want_write());
 
   adapter->Send();
   EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::DATA}));
   EXPECT_FALSE(adapter->session().want_write());
+
+  // Stream data is done, so this stream cannot be resumed.
+  EXPECT_FALSE(adapter->ResumeStream(stream_id));
+  EXPECT_FALSE(adapter->session().want_write());
 }
 
 // This test verifies how nghttp2 behaves when a connection becomes
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 1ba8d37..9f57aec 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -128,6 +128,10 @@
   return session_->GetStreamUserData(stream_id);
 }
 
+bool OgHttp2Adapter::ResumeStream(Http2StreamId stream_id) {
+  return session_->ResumeStream(stream_id);
+}
+
 const Http2Session& OgHttp2Adapter::session() const {
   return *session_;
 }
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index 7ef07f6..bbdd2a4 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -52,6 +52,7 @@
 
   void SetStreamUserData(Http2StreamId stream_id, void* user_data) override;
   void* GetStreamUserData(Http2StreamId stream_id) override;
+  bool ResumeStream(Http2StreamId stream_id) override;
 
   const Http2Session& session() const;
 
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index bb8405f..073bf1c 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -49,6 +49,16 @@
   return nullptr;
 }
 
+bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
+  if (auto it = stream_map_.find(stream_id);
+      it->second.outbound_body == nullptr ||
+      !write_scheduler_.StreamRegistered(stream_id)) {
+    return false;
+  }
+  write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
+  return true;
+}
+
 ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
   ssize_t preface_consumed = 0;
   if (!remaining_preface_.empty()) {
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index c5b9efc..329d2a9 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -54,6 +54,9 @@
   void SetStreamUserData(Http2StreamId stream_id, void* user_data);
   void* GetStreamUserData(Http2StreamId stream_id);
 
+  // Resumes a stream that was previously blocked. Returns true on success.
+  bool ResumeStream(Http2StreamId stream_id);
+
   // From Http2Session.
   ssize_t ProcessBytes(absl::string_view bytes) override;
   int Consume(Http2StreamId stream_id, size_t num_bytes) override;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 4b42188..ca0b0f6 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -252,8 +252,16 @@
   visitor.Clear();
   EXPECT_FALSE(session.want_write());
 
-  // Currently there is no way to indicate that the first stream is no longer
-  // read blocked.
+  body1.set_is_data_available(true);
+  EXPECT_TRUE(session.ResumeStream(stream_id));
+  EXPECT_TRUE(session.want_write());
+  session.Send();
+  EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA}));
+  EXPECT_FALSE(session.want_write());
+
+  // Stream data is done, so this stream cannot be resumed.
+  EXPECT_FALSE(session.ResumeStream(stream_id));
+  EXPECT_FALSE(session.want_write());
 }
 
 // This test exercises the case where the connection to the peer is write