Handle new/pending streams on receipt of a GOAWAY in oghttp2.
This CL is a follow-up to cl/439588951, which handled the receipt of a GOAWAY
for active streams. This CL handles the receipt of a GOAWAY for new/pending
streams with stream IDs above the received GOAWAY's last_stream_id by adding
those stream IDs to a new set in OgHttp2Session and delivering OnCloseStream()
events for those streams in the next write.
This choice was motivated by the nghttp2_submit_request() (and thus
SubmitRequest()) API, which requires returning a stream ID even after receipt
of a GOAWAY. Note that simply closing the pending streams on GOAWAY would be
insufficient, as subsequent calls to SubmitRequest() could result in new or
newly pending streams. Therefore, this CL instead decides to unify handling for
new/pending streams by catching them in the common StartNewRequest() method.
Because the OnCloseStream() is delivered on the next write attempt, this CL
also updates want_write() to account for these GOAWAY-rejected streams.
This change increases oghttp2 parity with nghttp2.
PiperOrigin-RevId: 440989445
diff --git a/quiche/http2/adapter/nghttp2_adapter_test.cc b/quiche/http2/adapter/nghttp2_adapter_test.cc
index 4df1a89..970d529 100644
--- a/quiche/http2/adapter/nghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/nghttp2_adapter_test.cc
@@ -1702,6 +1702,116 @@
EXPECT_THAT(visitor.data(), testing::IsEmpty());
}
+TEST(NgHttp2AdapterTest, ClientReceivesGoAwayWithPendingStreams) {
+ DataSavingVisitor visitor;
+ auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
+
+ int result = adapter->Send();
+ EXPECT_EQ(0, result);
+ // Client preface does not appear to include the mandatory SETTINGS frame.
+ EXPECT_THAT(visitor.data(),
+ testing::StrEq(spdy::kHttp2ConnectionHeaderPrefix));
+ visitor.Clear();
+
+ testing::InSequence s;
+
+ const std::string initial_frames =
+ TestFrameSequence()
+ .ServerPreface({{.id = MAX_CONCURRENT_STREAMS, .value = 1}})
+ .Serialize();
+
+ // Server preface (SETTINGS with MAX_CONCURRENT_STREAMS)
+ EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSetting);
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+ EXPECT_EQ(initial_frames.size(), initial_result);
+
+ const std::vector<Header> headers1 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}});
+
+ const int32_t stream_id1 = adapter->SubmitRequest(headers1, nullptr, nullptr);
+ ASSERT_GT(stream_id1, 0);
+
+ const std::vector<Header> headers2 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/two"}});
+
+ const int32_t stream_id2 = adapter->SubmitRequest(headers2, nullptr, nullptr);
+ ASSERT_GT(stream_id2, stream_id1);
+
+ // The second request should be pending because of
+ // SETTINGS_MAX_CONCURRENT_STREAMS.
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(),
+ EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS}));
+ visitor.Clear();
+
+ // Let the client receive a GOAWAY and raise MAX_CONCURRENT_STREAMS. Even
+ // though the GOAWAY last_stream_id is higher than the pending request's
+ // stream ID, pending request should not be sent.
+ const std::string stream_frames =
+ TestFrameSequence()
+ .GoAway(kMaxStreamId, Http2ErrorCode::INTERNAL_ERROR, "indigestion")
+ .Settings({{MAX_CONCURRENT_STREAMS, 42u}})
+ .Serialize();
+
+ EXPECT_CALL(visitor, OnFrameHeader(0, _, GOAWAY, 0));
+ EXPECT_CALL(visitor, OnGoAway(kMaxStreamId, Http2ErrorCode::INTERNAL_ERROR,
+ "indigestion"));
+ EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSetting(Http2Setting{MAX_CONCURRENT_STREAMS, 42u}));
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+ EXPECT_EQ(stream_frames.size(), stream_result);
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+ // Nghttp2 closes the pending stream on the next write attempt.
+ EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::REFUSED_STREAM));
+
+ EXPECT_TRUE(adapter->want_write());
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::SETTINGS}));
+ visitor.Clear();
+
+ // Requests submitted after receiving the GOAWAY should not be sent.
+ const std::vector<Header> headers3 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/three"}});
+
+ const int32_t stream_id3 = adapter->SubmitRequest(headers3, nullptr, nullptr);
+ ASSERT_GT(stream_id3, stream_id2);
+
+ // Nghttp2 closes the pending stream on the next write attempt.
+ EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM));
+
+ EXPECT_TRUE(adapter->want_write());
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(), testing::IsEmpty());
+ EXPECT_FALSE(adapter->want_write());
+}
+
TEST(NgHttp2AdapterTest, ClientFailsOnGoAway) {
DataSavingVisitor visitor;
auto adapter = NgHttp2Adapter::CreateClientAdapter(visitor);
diff --git a/quiche/http2/adapter/oghttp2_adapter_test.cc b/quiche/http2/adapter/oghttp2_adapter_test.cc
index 5c12d42..b279a76 100644
--- a/quiche/http2/adapter/oghttp2_adapter_test.cc
+++ b/quiche/http2/adapter/oghttp2_adapter_test.cc
@@ -2075,6 +2075,121 @@
EXPECT_THAT(visitor.data(), testing::IsEmpty());
}
+TEST(OgHttp2AdapterTest, ClientReceivesGoAwayWithPendingStreams) {
+ DataSavingVisitor visitor;
+ OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
+ auto adapter = OgHttp2Adapter::Create(visitor, options);
+
+ testing::InSequence s;
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+
+ int result = adapter->Send();
+ EXPECT_EQ(0, result);
+ absl::string_view data = visitor.data();
+ EXPECT_THAT(data, testing::StartsWith(spdy::kHttp2ConnectionHeaderPrefix));
+ data.remove_prefix(strlen(spdy::kHttp2ConnectionHeaderPrefix));
+ EXPECT_THAT(data, EqualsFrames({SpdyFrameType::SETTINGS}));
+ visitor.Clear();
+
+ const std::string initial_frames =
+ TestFrameSequence()
+ .ServerPreface({{.id = MAX_CONCURRENT_STREAMS, .value = 1}})
+ .Serialize();
+
+ // Server preface (SETTINGS with MAX_CONCURRENT_STREAMS)
+ EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSetting);
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ const int64_t initial_result = adapter->ProcessBytes(initial_frames);
+ EXPECT_EQ(initial_frames.size(), static_cast<size_t>(initial_result));
+
+ const std::vector<Header> headers1 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/one"}});
+
+ const int32_t stream_id1 = adapter->SubmitRequest(headers1, nullptr, nullptr);
+ ASSERT_GT(stream_id1, 0);
+
+ const std::vector<Header> headers2 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/two"}});
+
+ const int32_t stream_id2 = adapter->SubmitRequest(headers2, nullptr, nullptr);
+ ASSERT_GT(stream_id2, stream_id1);
+
+ // The second request should be pending because of
+ // SETTINGS_MAX_CONCURRENT_STREAMS.
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+ EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
+ EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
+
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(),
+ EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::HEADERS}));
+ visitor.Clear();
+
+ // Let the client receive a GOAWAY and raise MAX_CONCURRENT_STREAMS. Even
+ // though the GOAWAY last_stream_id is higher than the pending request's
+ // stream ID, pending request should not be sent.
+ const std::string stream_frames =
+ TestFrameSequence()
+ .GoAway(kMaxStreamId, Http2ErrorCode::INTERNAL_ERROR, "indigestion")
+ .Settings({{MAX_CONCURRENT_STREAMS, 42u}})
+ .Serialize();
+
+ EXPECT_CALL(visitor, OnFrameHeader(0, _, GOAWAY, 0));
+ EXPECT_CALL(visitor,
+ OnGoAway(kMaxStreamId, Http2ErrorCode::INTERNAL_ERROR, ""));
+ EXPECT_CALL(visitor, OnFrameHeader(0, 6, SETTINGS, 0));
+ EXPECT_CALL(visitor, OnSettingsStart());
+ EXPECT_CALL(visitor, OnSetting(Http2Setting{MAX_CONCURRENT_STREAMS, 42u}));
+ EXPECT_CALL(visitor, OnSettingsEnd());
+
+ const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+ EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
+
+ EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, 0, 0x1));
+ EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, 0, 0x1, 0));
+
+ // We close the pending stream on the next write attempt.
+ EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::REFUSED_STREAM));
+
+ EXPECT_TRUE(adapter->want_write());
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(), EqualsFrames({SpdyFrameType::SETTINGS}));
+ visitor.Clear();
+
+ // Requests submitted after receiving the GOAWAY should not be sent.
+ const std::vector<Header> headers3 =
+ ToHeaders({{":method", "GET"},
+ {":scheme", "http"},
+ {":authority", "example.com"},
+ {":path", "/this/is/request/three"}});
+
+ const int32_t stream_id3 = adapter->SubmitRequest(headers3, nullptr, nullptr);
+ ASSERT_GT(stream_id3, stream_id2);
+
+ // We close the pending stream on the next write attempt.
+ EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM));
+
+ EXPECT_TRUE(adapter->want_write());
+ result = adapter->Send();
+ EXPECT_EQ(0, result);
+ EXPECT_THAT(visitor.data(), testing::IsEmpty());
+ EXPECT_FALSE(adapter->want_write());
+}
+
TEST(OgHttp2AdapterTest, ClientFailsOnGoAway) {
DataSavingVisitor visitor;
OgHttp2Adapter::Options options{.perspective = Perspective::kClient};
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index 655a9e2..eddfc99 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -592,6 +592,8 @@
!connection_metadata_.empty()) {
continue_writing = SendMetadata(0, connection_metadata_);
}
+ // Notify on new/pending streams closed due to GOAWAY receipt.
+ CloseGoAwayRejectedStreams();
// Wake streams for writes.
while (continue_writing == SendResult::SEND_OK && HasReadyStream()) {
const Http2StreamId stream_id = GetNextReadyStream();
@@ -1230,6 +1232,10 @@
break;
case MAX_CONCURRENT_STREAMS:
max_outbound_concurrent_streams_ = value;
+ if (!IsServerSession()) {
+ // We may now be able to start pending streams.
+ StartPendingStreams();
+ }
break;
case HEADER_TABLE_SIZE:
value = std::min(value, HpackCapacityBound(options_));
@@ -1702,6 +1708,12 @@
spdy::SpdyHeaderBlock headers,
std::unique_ptr<DataFrameSource> data_source,
void* user_data) {
+ if (received_goaway_) {
+ // Do not start new streams after receiving a GOAWAY.
+ goaway_rejected_streams_.insert(stream_id);
+ return;
+ }
+
auto iter = CreateStream(stream_id);
const bool end_stream = data_source == nullptr;
if (!end_stream) {
@@ -1790,6 +1802,18 @@
}
}
+void OgHttp2Session::CloseGoAwayRejectedStreams() {
+ for (Http2StreamId stream_id : goaway_rejected_streams_) {
+ const bool result =
+ visitor_.OnCloseStream(stream_id, Http2ErrorCode::REFUSED_STREAM);
+ if (!result) {
+ latched_error_ = true;
+ decoder_.StopProcessing();
+ }
+ }
+ goaway_rejected_streams_.clear();
+}
+
void OgHttp2Session::PrepareForImmediateGoAway() {
queued_immediate_goaway_ = true;
diff --git a/quiche/http2/adapter/oghttp2_session.h b/quiche/http2/adapter/oghttp2_session.h
index 43542c5..a451079 100644
--- a/quiche/http2/adapter/oghttp2_session.h
+++ b/quiche/http2/adapter/oghttp2_session.h
@@ -143,7 +143,8 @@
bool want_write() const override {
return !fatal_send_error_ &&
(!frames_.empty() || !buffered_data_.empty() ||
- !connection_metadata_.empty() || HasReadyStream());
+ !connection_metadata_.empty() || HasReadyStream() ||
+ !goaway_rejected_streams_.empty());
}
int GetRemoteWindowSize() const override { return connection_send_window_; }
@@ -404,6 +405,9 @@
void CloseStreamIfReady(uint8_t frame_type, uint32_t stream_id);
+ // Informs the visitor of rejected, non-active streams due to GOAWAY receipt.
+ void CloseGoAwayRejectedStreams();
+
// Updates internal state to prepare for sending an immediate GOAWAY.
void PrepareForImmediateGoAway();
@@ -488,6 +492,8 @@
absl::flat_hash_set<Http2StreamId> trailers_ready_;
// Includes streams that are currently ready to write metadata.
absl::flat_hash_set<Http2StreamId> metadata_ready_;
+ // Includes streams that will not be written due to receipt of GOAWAY.
+ absl::flat_hash_set<Http2StreamId> goaway_rejected_streams_;
MetadataSequence connection_metadata_;