Adds a method to the Http2Adapter API to resume streams which were deferred due to being read-blocked.
PiperOrigin-RevId: 378697186
diff --git a/http2/adapter/http2_adapter.h b/http2/adapter/http2_adapter.h
index fc64c0b..2dd7d3f 100644
--- a/http2/adapter/http2_adapter.h
+++ b/http2/adapter/http2_adapter.h
@@ -111,6 +111,11 @@
// not been set.
virtual void* GetStreamUserData(Http2StreamId stream_id) = 0;
+ // Resumes a stream that was previously blocked (for example, due to
+ // DataFrameSource::SelectPayloadLength() returning kBlocked). Returns true if
+ // the stream was successfully resumed.
+ virtual bool ResumeStream(Http2StreamId stream_id) = 0;
+
protected:
// Subclasses should expose a public factory method for constructing and
// initializing (via Initialize()) adapter instances.
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index 31b215c..6ec2a14 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -167,6 +167,10 @@
return nghttp2_session_get_stream_user_data(session_->raw_ptr(), stream_id);
}
+bool NgHttp2Adapter::ResumeStream(Http2StreamId stream_id) {
+ return 0 == nghttp2_session_resume_data(session_->raw_ptr(), stream_id);
+}
+
NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor,
Perspective perspective)
: Http2Adapter(visitor), visitor_(visitor), perspective_(perspective) {}
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index b8dfb3b..a3f32df 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -70,6 +70,8 @@
void SetStreamUserData(Http2StreamId stream_id, void* user_data) override;
void* GetStreamUserData(Http2StreamId stream_id) override;
+ bool ResumeStream(Http2StreamId stream_id) override;
+
// TODO(b/181586191): Temporary accessor until equivalent functionality is
// available in this adapter class.
NgHttp2Session& session() { return *session_; }
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index d930a26..070ed68 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -350,12 +350,16 @@
// Resume the deferred stream.
body1.set_is_data_available(true);
- nghttp2_session_resume_data(adapter->session().raw_ptr(), stream_id);
+ EXPECT_TRUE(adapter->ResumeStream(stream_id));
EXPECT_TRUE(adapter->session().want_write());
adapter->Send();
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::DATA}));
EXPECT_FALSE(adapter->session().want_write());
+
+ // Stream data is done, so this stream cannot be resumed.
+ EXPECT_FALSE(adapter->ResumeStream(stream_id));
+ EXPECT_FALSE(adapter->session().want_write());
}
// This test verifies how nghttp2 behaves when a connection becomes
diff --git a/http2/adapter/oghttp2_adapter.cc b/http2/adapter/oghttp2_adapter.cc
index 1ba8d37..9f57aec 100644
--- a/http2/adapter/oghttp2_adapter.cc
+++ b/http2/adapter/oghttp2_adapter.cc
@@ -128,6 +128,10 @@
return session_->GetStreamUserData(stream_id);
}
+bool OgHttp2Adapter::ResumeStream(Http2StreamId stream_id) {
+ return session_->ResumeStream(stream_id);
+}
+
const Http2Session& OgHttp2Adapter::session() const {
return *session_;
}
diff --git a/http2/adapter/oghttp2_adapter.h b/http2/adapter/oghttp2_adapter.h
index 7ef07f6..bbdd2a4 100644
--- a/http2/adapter/oghttp2_adapter.h
+++ b/http2/adapter/oghttp2_adapter.h
@@ -52,6 +52,7 @@
void SetStreamUserData(Http2StreamId stream_id, void* user_data) override;
void* GetStreamUserData(Http2StreamId stream_id) override;
+ bool ResumeStream(Http2StreamId stream_id) override;
const Http2Session& session() const;
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index bb8405f..073bf1c 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -49,6 +49,16 @@
return nullptr;
}
+bool OgHttp2Session::ResumeStream(Http2StreamId stream_id) {
+ if (auto it = stream_map_.find(stream_id);
+ it->second.outbound_body == nullptr ||
+ !write_scheduler_.StreamRegistered(stream_id)) {
+ return false;
+ }
+ write_scheduler_.MarkStreamReady(stream_id, /*add_to_front=*/false);
+ return true;
+}
+
ssize_t OgHttp2Session::ProcessBytes(absl::string_view bytes) {
ssize_t preface_consumed = 0;
if (!remaining_preface_.empty()) {
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index c5b9efc..329d2a9 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -54,6 +54,9 @@
void SetStreamUserData(Http2StreamId stream_id, void* user_data);
void* GetStreamUserData(Http2StreamId stream_id);
+ // Resumes a stream that was previously blocked. Returns true on success.
+ bool ResumeStream(Http2StreamId stream_id);
+
// From Http2Session.
ssize_t ProcessBytes(absl::string_view bytes) override;
int Consume(Http2StreamId stream_id, size_t num_bytes) override;
diff --git a/http2/adapter/oghttp2_session_test.cc b/http2/adapter/oghttp2_session_test.cc
index 4b42188..ca0b0f6 100644
--- a/http2/adapter/oghttp2_session_test.cc
+++ b/http2/adapter/oghttp2_session_test.cc
@@ -252,8 +252,16 @@
visitor.Clear();
EXPECT_FALSE(session.want_write());
- // Currently there is no way to indicate that the first stream is no longer
- // read blocked.
+ body1.set_is_data_available(true);
+ EXPECT_TRUE(session.ResumeStream(stream_id));
+ EXPECT_TRUE(session.want_write());
+ session.Send();
+ EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::DATA}));
+ EXPECT_FALSE(session.want_write());
+
+ // Stream data is done, so this stream cannot be resumed.
+ EXPECT_FALSE(session.ResumeStream(stream_id));
+ EXPECT_FALSE(session.want_write());
}
// This test exercises the case where the connection to the peer is write