Let client-side OgHttp2Session obey the peer's advertised SETTINGS_MAX_CONCURRENT_STREAMS. This CL allows client-side OgHttp2Session to save the peer's advertised SETTINGS_MAX_CONCURRENT_STREAMS value and use that value in SubmitRequest() to determine whether to create a stream immediately for the request or store the request for later. OgHttp2Session will create a stream immediately unless that would violate SETTINGS_MAX_CONCURRENT_STREAMS; in this case instead, the request is stored in a (newly added) PendingStreamState. Then when removing a stream from the map in CloseStream(), OgHttp2Session will check if pending streams can be created (compliant with SETTINGS_MAX_CONCURRENT_STREAMS). PiperOrigin-RevId: 398554048
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc index 28e3411..9877513 100644 --- a/http2/adapter/oghttp2_adapter_test.cc +++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -775,10 +775,40 @@ {":path", "/this/is/request/two"}}), nullptr, nullptr); - // A new stream is created and the session wants to write, despite - // MAX_CONCURRENT_STREAMS. - // TODO(diannahu): Respect the peer's MAX_CONCURRENT_STREAMS value. + // A new pending stream is created, but because of MAX_CONCURRENT_STREAMS, the + // session should not want to write it at the moment. EXPECT_GT(next_stream_id, stream_id); + EXPECT_FALSE(adapter->session().want_write()); + + const std::string stream_frames = + TestFrameSequence() + .Headers(stream_id, + {{":status", "200"}, + {"server", "my-fake-server"}, + {"date", "Tue, 6 Apr 2021 12:54:01 GMT"}}, + /*fin=*/false) + .Data(stream_id, "This is the response body.", /*fin=*/true) + .Serialize(); + + EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 4)); + EXPECT_CALL(visitor, OnBeginHeadersForStream(stream_id)); + EXPECT_CALL(visitor, OnHeaderForStream(stream_id, ":status", "200")); + EXPECT_CALL(visitor, + OnHeaderForStream(stream_id, "server", "my-fake-server")); + EXPECT_CALL(visitor, OnHeaderForStream(stream_id, "date", + "Tue, 6 Apr 2021 12:54:01 GMT")); + EXPECT_CALL(visitor, OnEndHeadersForStream(stream_id)); + EXPECT_CALL(visitor, OnFrameHeader(stream_id, 26, DATA, 0x1)); + EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, 26)); + EXPECT_CALL(visitor, + OnDataForStream(stream_id, "This is the response body.")); + EXPECT_CALL(visitor, OnEndStream(stream_id)); + EXPECT_CALL(visitor, OnCloseStream(stream_id, Http2ErrorCode::NO_ERROR)); + + // The first stream should close, which should make the session want to write + // the next stream. + const int64_t stream_result = adapter->ProcessBytes(stream_frames); + EXPECT_EQ(stream_frames.size(), static_cast<size_t>(stream_result)); EXPECT_TRUE(adapter->session().want_write()); EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, next_stream_id, _, 0x5)); @@ -786,6 +816,7 @@ result = adapter->Send(); EXPECT_EQ(0, result); + EXPECT_THAT(visitor.data(), EqualsFrames({spdy::SpdyFrameType::HEADERS})); visitor.Clear(); EXPECT_FALSE(adapter->session().want_write());
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc index 0ec920e..e09ac21 100644 --- a/http2/adapter/oghttp2_session.cc +++ b/http2/adapter/oghttp2_session.cc
@@ -485,8 +485,15 @@ // TODO(birenroy): return an error for the incorrect perspective const Http2StreamId stream_id = next_stream_id_; next_stream_id_ += 2; - StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source), - user_data); + if (CanCreateStream()) { + StartRequest(stream_id, ToHeaderBlock(headers), std::move(data_source), + user_data); + } else { + // TODO(diannahu): There should probably be a limit to the number of allowed + // pending streams. + pending_streams_.push_back( + {stream_id, ToHeaderBlock(headers), std::move(data_source), user_data}); + } return stream_id; } @@ -654,6 +661,8 @@ peer_supports_metadata_ = (value != 0); } else if (id == MAX_FRAME_SIZE) { max_frame_payload_ = value; + } else if (id == MAX_CONCURRENT_STREAMS) { + max_outbound_concurrent_streams_ = value; } } @@ -896,6 +905,18 @@ Http2ErrorCode error_code) { visitor_.OnCloseStream(stream_id, error_code); stream_map_.erase(stream_id); + + if (!pending_streams_.empty() && CanCreateStream()) { + PendingStreamState& pending_stream = pending_streams_.front(); + StartRequest(pending_stream.stream_id, std::move(pending_stream.headers), + std::move(pending_stream.data_source), + pending_stream.user_data); + pending_streams_.pop_front(); + } +} + +bool OgHttp2Session::CanCreateStream() const { + return stream_map_.size() < max_outbound_concurrent_streams_; } } // namespace adapter
diff --git a/http2/adapter/oghttp2_session.h b/http2/adapter/oghttp2_session.h index cd9bbda..56f0621 100644 --- a/http2/adapter/oghttp2_session.h +++ b/http2/adapter/oghttp2_session.h
@@ -167,6 +167,7 @@ private: using MetadataSequence = std::vector<std::unique_ptr<MetadataSource>>; + struct QUICHE_EXPORT_PRIVATE StreamState { StreamState(int32_t stream_receive_window, WindowManager::WindowUpdateListener listener) @@ -183,6 +184,13 @@ }; using StreamStateMap = absl::flat_hash_map<Http2StreamId, StreamState>; + struct QUICHE_EXPORT_PRIVATE PendingStreamState { + Http2StreamId stream_id; + spdy::SpdyHeaderBlock headers; + std::unique_ptr<DataFrameSource> data_source; + void* user_data = nullptr; + }; + class QUICHE_EXPORT_PRIVATE PassthroughHeadersHandler : public spdy::SpdyHeadersHandlerInterface { public: @@ -246,6 +254,9 @@ // Closes the given `stream_id` with the given `error_code`. void CloseStream(Http2StreamId stream_id, Http2ErrorCode error_code); + // Returns true if the session can create a new stream. + bool CanCreateStream() const; + // Receives events when inbound frames are parsed. Http2VisitorInterface& visitor_; @@ -255,9 +266,14 @@ // Decodes inbound frames. http2::Http2DecoderAdapter decoder_; - // Maintains the state of all streams known to this session. + // Maintains the state of active streams known to this session. StreamStateMap stream_map_; + // Maintains the state of pending streams known to this session. A pending + // stream is kept in this list until it can be created while complying with + // `max_outbound_concurrent_streams_`. + std::list<PendingStreamState> pending_streams_; + // Maintains the queue of outbound frames, and any serialized bytes that have // not yet been consumed. std::list<std::unique_ptr<spdy::SpdyFrameIR>> frames_; @@ -288,6 +304,8 @@ // The initial flow control receive window size for any newly created streams. int32_t stream_receive_window_limit_ = kInitialFlowControlWindowSize; uint32_t max_frame_payload_ = 16384u; + // The spec encourages a value of at least 100 concurrent streams. + uint32_t max_outbound_concurrent_streams_ = 100u; Options options_; bool received_goaway_ = false; bool queued_preface_ = false;