Let client-side OgHttp2Session obey the peer's advertised SETTINGS_MAX_CONCURRENT_STREAMS.
This CL allows client-side OgHttp2Session to save the peer's advertised
SETTINGS_MAX_CONCURRENT_STREAMS value and use that value in SubmitRequest() to
determine whether to create a stream immediately for the request or store the
request for later. OgHttp2Session will create a stream immediately unless that
would violate SETTINGS_MAX_CONCURRENT_STREAMS; in this case instead, the
request is stored in a (newly added) PendingStreamState. Then when removing a
stream from the map in CloseStream(), OgHttp2Session will check if pending
streams can be created (compliant with SETTINGS_MAX_CONCURRENT_STREAMS).
PiperOrigin-RevId: 398554048
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index 28e3411..9877513 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -775,10 +775,40 @@
{":path", "/this/is/request/two"}}),
nullptr, nullptr);
- // A new stream is created and the session wants to write, despite
- // MAX_CONCURRENT_STREAMS.
- // TODO(diannahu): Respect the peer's MAX_CONCURRENT_STREAMS value.
+ // A new pending stream is created, but because of MAX_CONCURRENT_STREAMS, the
+ // session should not want to write it at the moment.
EXPECT_GT(next_stream_id, stream_id);
+ EXPECT_FALSE(adapter->session().want_write());
+
+ const std::string stream_frames =
+ TestFrameSequence()
+ .Headers(stream_id,
+ {{":status", "200"},
+ {"server", "my-fake-server"},
+ {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}},
+ /*fin=*/false)
+ .Data(stream_id, "This is the response body.", /*fin=*/true)
+ .Serialize();
+
+ EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 4));
+ EXPECT_CALL(visitor, OnBeginHeadersForStream(stream_id));
+ EXPECT_CALL(visitor, OnHeaderForStream(stream_id, ":status", "200"));
+ EXPECT_CALL(visitor,
+ OnHeaderForStream(stream_id, "server", "my-fake-server"));
+ EXPECT_CALL(visitor, OnHeaderForStream(stream_id, "date",
+ "Tue, 6 Apr 2021 12:54:01 GMT"));
+ EXPECT_CALL(visitor, OnEndHeadersForStream(stream_id));
+ EXPECT_CALL(visitor, OnFrameHeader(stream_id, 26, DATA, 0x1));
+ EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, 26));
+ EXPECT_CALL(visitor,
+ OnDataForStream(stream_id, "This is the response body."));
+ EXPECT_CALL(visitor, OnEndStream(stream_id));
+ EXPECT_CALL(visitor, OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR));
+
+ // The first stream should close, which should make the session want to write
+ // the next stream.
+ const int64_t stream_result = adapter->ProcessBytes(stream_frames);
+ EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result));
EXPECT_TRUE(adapter->session().want_write());
EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, next_stream_id, _, 0x5));
@@ -786,6 +816,7 @@
result = adapter->Send();
EXPECT_EQ(0, result);
+
EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS}));
visitor.Clear();
EXPECT_FALSE(adapter->session().want_write());
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index 0ec920e..e09ac21 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -485,8 +485,15 @@
// TODO(birenroy): return an error for the incorrect perspective
const Http2StreamId stream_id = next_stream_id_;
next_stream_id_ += 2;
- StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
- user_data);
+ if (CanCreateStream()) {
+ StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source),
+ user_data);
+ } else {
+ // TODO(diannahu): There should probably be a limit to the number of allowed
+ // pending streams.
+ pending_streams_.push_back(
+ {stream_id, ToHeaderBlock(headers), std::move(data_source), user_data});
+ }
return stream_id;
}
@@ -654,6 +661,8 @@
peer_supports_metadata_ = (value != 0);
} else if (id == MAX_FRAME_SIZE) {
max_frame_payload_ = value;
+ } else if (id == MAX_CONCURRENT_STREAMS) {
+ max_outbound_concurrent_streams_ = value;
}
}
@@ -896,6 +905,18 @@
Http2ErrorCode error_code) {
visitor_.OnCloseStream(stream_id, error_code);
stream_map_.erase(stream_id);
+
+ if (!pending_streams_.empty() && CanCreateStream()) {
+ PendingStreamState& pending_stream = pending_streams_.front();
+ StartRequest(pending_stream.stream_id, std::move(pending_stream.headers),
+ std::move(pending_stream.data_source),
+ pending_stream.user_data);
+ pending_streams_.pop_front();
+ }
+}
+
+bool OgHttp2Session::CanCreateStream() const {
+ return stream_map_.size() < max_outbound_concurrent_streams_;
}
} // namespace adapter
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h
index cd9bbda..56f0621 100644
--- a/http2/adapter/oghttp2_session.h
+++ b/http2/adapter/oghttp2_session.h
@@ -167,6 +167,7 @@
private:
using MetadataSequence = std::vector<std::unique_ptr<MetadataSource>>;
+
struct QUICHE_EXPORT_PRIVATE StreamState {
StreamState(int32_t stream_receive_window,
WindowManager::WindowUpdateListener listener)
@@ -183,6 +184,13 @@
};
using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>;
+ struct QUICHE_EXPORT_PRIVATE PendingStreamState {
+ Http2StreamId stream_id;
+ spdy::SpdyHeaderBlock headers;
+ std::unique_ptr<DataFrameSource> data_source;
+ void* user_data = nullptr;
+ };
+
class QUICHE_EXPORT_PRIVATE PassthroughHeadersHandler
: public spdy::SpdyHeadersHandlerInterface {
public:
@@ -246,6 +254,9 @@
// Closes the given `stream_id` with the given `error_code`.
void CloseStream(Http2StreamId stream_id, Http2ErrorCode error_code);
+ // Returns true if the session can create a new stream.
+ bool CanCreateStream() const;
+
// Receives events when inbound frames are parsed.
Http2VisitorInterface& visitor_;
@@ -255,9 +266,14 @@
// Decodes inbound frames.
http2::Http2DecoderAdapter decoder_;
- // Maintains the state of all streams known to this session.
+ // Maintains the state of active streams known to this session.
StreamStateMap stream_map_;
+ // Maintains the state of pending streams known to this session. A pending
+ // stream is kept in this list until it can be created while complying with
+ // `max_outbound_concurrent_streams_`.
+ std::list<PendingStreamState> pending_streams_;
+
// Maintains the queue of outbound frames, and any serialized bytes that have
// not yet been consumed.
std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_;
@@ -288,6 +304,8 @@
// The initial flow control receive window size for any newly created streams.
int32_t stream_receive_window_limit_ = kInitialFlowControlWindowSize;
uint32_t max_frame_payload_ = 16384u;
+ // The spec encourages a value of at least 100 concurrent streams.
+ uint32_t max_outbound_concurrent_streams_ = 100u;
Options options_;
bool received_goaway_ = false;
bool queued_preface_ = false;