Allow NgHttp2Adapter to remove `sources_` map entries via CallbackVisitor. This CL introduces NgHttp2Adapter::RemoveStream() and CallbackVisitor::set_stream_close_listener() that can be used together to remove entries from the per-stream `sources_` map when a stream is closed. Namely, CallbackVisitor::set_stream_close_listener() can be called with a lambda that invokes NgHttp2Adapter::RemoveStream() in codec_impl.cc. This CL has a positive effect on memory profiles in loadtests for addressing go/envoy-upstream/issues/19761. PiperOrigin-RevId: 437827219
diff --git a/http2/adapter/callback_visitor.cc b/http2/adapter/callback_visitor.cc index fa52930..2a00eed 100644 --- a/http2/adapter/callback_visitor.cc +++ b/http2/adapter/callback_visitor.cc
@@ -263,6 +263,9 @@ nullptr, stream_id, static_cast<uint32_t>(error_code), user_data_); } stream_map_.erase(stream_id); + if (stream_close_listener_) { + stream_close_listener_(stream_id); + } } void CallbackVisitor::OnPriorityForStream(Http2StreamId /*stream_id*/,
diff --git a/http2/adapter/callback_visitor.h b/http2/adapter/callback_visitor.h index d38e66f..a64c8e8 100644 --- a/http2/adapter/callback_visitor.h +++ b/http2/adapter/callback_visitor.h
@@ -3,6 +3,7 @@ #include <cstdint> #include <memory> +#include <utility> #include <vector> #include "absl/container/flat_hash_map.h" @@ -18,6 +19,9 @@ // data" pointer, and invokes the callbacks according to HTTP/2 events received. class QUICHE_EXPORT_PRIVATE CallbackVisitor : public Http2VisitorInterface { public: + // Called when the visitor receives a close event for `stream_id`. + using StreamCloseListener = std::function<void(Http2StreamId stream_id)>; + explicit CallbackVisitor(Perspective perspective, const nghttp2_session_callbacks& callbacks, void* user_data); @@ -71,6 +75,10 @@ size_t stream_map_size() const { return stream_map_.size(); } + void set_stream_close_listener(StreamCloseListener stream_close_listener) { + stream_close_listener_ = std::move(stream_close_listener); + } + private: struct QUICHE_EXPORT_PRIVATE StreamInfo { bool before_sent_headers = false; @@ -89,6 +97,8 @@ StreamInfoMap stream_map_; + StreamCloseListener stream_close_listener_; + Perspective perspective_; nghttp2_session_callbacks_unique_ptr callbacks_; void* user_data_;
diff --git a/http2/adapter/callback_visitor_test.cc b/http2/adapter/callback_visitor_test.cc index 8f83da5..fee6fc4 100644 --- a/http2/adapter/callback_visitor_test.cc +++ b/http2/adapter/callback_visitor_test.cc
@@ -1,5 +1,7 @@ #include "http2/adapter/callback_visitor.h" +#include "absl/container/flat_hash_map.h" +#include "http2/adapter/http2_protocol.h" #include "http2/adapter/mock_nghttp2_callbacks.h" #include "http2/adapter/nghttp2_test_utils.h" #include "http2/adapter/test_utils.h" @@ -11,6 +13,9 @@ namespace { using testing::_; +using testing::IsEmpty; +using testing::Pair; +using testing::UnorderedElementsAre; enum FrameType { DATA, @@ -78,6 +83,11 @@ testing::StrictMock<MockNghttp2Callbacks> callbacks; CallbackVisitor visitor(Perspective::kClient, *MockNghttp2Callbacks::GetCallbacks(), &callbacks); + absl::flat_hash_map<Http2StreamId, int> stream_close_counts; + visitor.set_stream_close_listener( + [&stream_close_counts](Http2StreamId stream_id) { + ++stream_close_counts[stream_id]; + }); testing::InSequence seq; @@ -131,12 +141,15 @@ EXPECT_CALL(callbacks, OnFrameRecv(IsHeaders(1, _, NGHTTP2_HCAT_HEADERS))); visitor.OnEndHeadersForStream(1); + EXPECT_THAT(stream_close_counts, IsEmpty()); + // RST_STREAM on stream 3 EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(3, RST_STREAM, 0))); visitor.OnFrameHeader(3, 4, RST_STREAM, 0); // No change in stream map size. EXPECT_EQ(visitor.stream_map_size(), 1); + EXPECT_THAT(stream_close_counts, IsEmpty()); EXPECT_CALL(callbacks, OnFrameRecv(IsRstStream(3, NGHTTP2_INTERNAL_ERROR))); visitor.OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR); @@ -144,6 +157,8 @@ EXPECT_CALL(callbacks, OnStreamClose(3, NGHTTP2_INTERNAL_ERROR)); visitor.OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR); + EXPECT_THAT(stream_close_counts, UnorderedElementsAre(Pair(3, 1))); + // More stream close events EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(1, DATA, NGHTTP2_FLAG_END_STREAM))); @@ -156,8 +171,10 @@ EXPECT_CALL(callbacks, OnStreamClose(1, NGHTTP2_NO_ERROR)); visitor.OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR); - // Stream map is empty again. + // Stream map is empty again after both streams were closed. EXPECT_EQ(visitor.stream_map_size(), 0); + EXPECT_THAT(stream_close_counts, + UnorderedElementsAre(Pair(3, 1), Pair(1, 1))); EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(5, RST_STREAM, _))); visitor.OnFrameHeader(5, 4, RST_STREAM, 0); @@ -169,6 +186,8 @@ visitor.OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM); EXPECT_EQ(visitor.stream_map_size(), 0); + EXPECT_THAT(stream_close_counts, + UnorderedElementsAre(Pair(3, 1), Pair(1, 1), Pair(5, 1))); } TEST(ClientCallbackVisitorUnitTest, HeadersWithContinuation) {
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc index a6ba0b2..c8dbc74 100644 --- a/http2/adapter/nghttp2_adapter.cc +++ b/http2/adapter/nghttp2_adapter.cc
@@ -220,7 +220,6 @@ int32_t stream_id = nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(), nvs.size(), provider.get(), stream_user_data); - // TODO(birenroy): clean up data source on stream close sources_.emplace(stream_id, std::move(data_source)); QUICHE_VLOG(1) << "Submitted request with " << nvs.size() << " request headers and user data " << stream_user_data @@ -235,7 +234,6 @@ std::unique_ptr<nghttp2_data_provider> provider = MakeDataProvider(data_source.get()); - // TODO(birenroy): clean up data source on stream close sources_.emplace(stream_id, std::move(data_source)); int result = nghttp2_submit_response(session_->raw_ptr(), stream_id, @@ -269,6 +267,10 @@ return 0 == nghttp2_session_resume_data(session_->raw_ptr(), stream_id); } +void NgHttp2Adapter::RemoveStream(Http2StreamId stream_id) { + sources_.erase(stream_id); +} + NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor, Perspective perspective, const nghttp2_option* options)
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h index 9078278..257f01a 100644 --- a/http2/adapter/nghttp2_adapter.h +++ b/http2/adapter/nghttp2_adapter.h
@@ -88,6 +88,12 @@ bool ResumeStream(Http2StreamId stream_id) override; + // Removes references to the `stream_id` from this adapter. + void RemoveStream(Http2StreamId stream_id); + + // Accessor for testing. + size_t sources_size() const { return sources_.size(); } + private: NgHttp2Adapter(Http2VisitorInterface& visitor, Perspective perspective, const nghttp2_option* options);
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc index a90e7ed..d238fd2 100644 --- a/http2/adapter/nghttp2_adapter_test.cc +++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -143,6 +143,8 @@ adapter->SetStreamUserData(stream_id2, const_cast<char*>(kSentinel2)); adapter->SetStreamUserData(stream_id3, nullptr); + EXPECT_EQ(adapter->sources_size(), 3); + EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5)); EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0)); EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x5)); @@ -205,7 +207,11 @@ EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body.")); EXPECT_CALL(visitor, OnFrameHeader(3, 4, RST_STREAM, 0)); EXPECT_CALL(visitor, OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR)); - EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR)); + EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR)) + .WillOnce( + [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) { + adapter->RemoveStream(stream_id); + }); EXPECT_CALL(visitor, OnFrameHeader(0, 19, GOAWAY, 0)); EXPECT_CALL(visitor, OnGoAway(5, Http2ErrorCode::ENHANCE_YOUR_CALM, "calm down!!")); @@ -221,6 +227,9 @@ EXPECT_EQ(kInitialFlowControlWindowSize, adapter->GetStreamReceiveWindowSize(stream_id3)); + // One stream was closed. + EXPECT_EQ(adapter->sources_size(), 2); + // Connection window should be the same as the first stream. EXPECT_EQ(adapter->GetReceiveWindowSize(), adapter->GetStreamReceiveWindowSize(stream_id1)); @@ -241,10 +250,18 @@ EXPECT_CALL(visitor, OnFrameHeader(1, 0, DATA, 1)); EXPECT_CALL(visitor, OnBeginDataForStream(1, 0)); EXPECT_CALL(visitor, OnEndStream(1)); - EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR)); + EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR)) + .WillOnce( + [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) { + adapter->RemoveStream(stream_id); + }); EXPECT_CALL(visitor, OnFrameHeader(5, 4, RST_STREAM, 0)); EXPECT_CALL(visitor, OnRstStream(5, Http2ErrorCode::REFUSED_STREAM)); - EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM)); + EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM)) + .WillOnce( + [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) { + adapter->RemoveStream(stream_id); + }); adapter->ProcessBytes(TestFrameSequence() .Data(1, "", true) .RstStream(5, Http2ErrorCode::REFUSED_STREAM) @@ -256,6 +273,7 @@ // After receiving END_STREAM for 1 and RST_STREAM for 5, the session no // longer expects reads. EXPECT_FALSE(adapter->want_read()); + EXPECT_EQ(adapter->sources_size(), 0); // Client will not have anything else to write. EXPECT_FALSE(adapter->want_write());