Allow NgHttp2Adapter to remove `sources_` map entries via CallbackVisitor.
This CL introduces NgHttp2Adapter::RemoveStream() and
CallbackVisitor::set_stream_close_listener() that can be used together to
remove entries from the per-stream `sources_` map when a stream is closed.
Namely, CallbackVisitor::set_stream_close_listener() can be called with a
lambda that invokes NgHttp2Adapter::RemoveStream() in codec_impl.cc.
This CL has a positive effect on memory profiles in loadtests for addressing
go/envoy-upstream/issues/19761.
PiperOrigin-RevId: 437827219
diff --git a/http2/adapter/callback_visitor.cc b/http2/adapter/callback_visitor.cc
index fa52930..2a00eed 100644
--- a/http2/adapter/callback_visitor.cc
+++ b/http2/adapter/callback_visitor.cc
@@ -263,6 +263,9 @@
nullptr, stream_id, static_cast<uint32_t>(error_code), user_data_);
}
stream_map_.erase(stream_id);
+ if (stream_close_listener_) {
+ stream_close_listener_(stream_id);
+ }
}
void CallbackVisitor::OnPriorityForStream(Http2StreamId /*stream_id*/,
diff --git a/http2/adapter/callback_visitor.h b/http2/adapter/callback_visitor.h
index d38e66f..a64c8e8 100644
--- a/http2/adapter/callback_visitor.h
+++ b/http2/adapter/callback_visitor.h
@@ -3,6 +3,7 @@
#include <cstdint>
#include <memory>
+#include <utility>
#include <vector>
#include "absl/container/flat_hash_map.h"
@@ -18,6 +19,9 @@
// data" pointer, and invokes the callbacks according to HTTP/2 events received.
class QUICHE_EXPORT_PRIVATE CallbackVisitor : public Http2VisitorInterface {
public:
+ // Called when the visitor receives a close event for `stream_id`.
+ using StreamCloseListener = std::function<void(Http2StreamId stream_id)>;
+
explicit CallbackVisitor(Perspective perspective,
const nghttp2_session_callbacks& callbacks,
void* user_data);
@@ -71,6 +75,10 @@
size_t stream_map_size() const { return stream_map_.size(); }
+ void set_stream_close_listener(StreamCloseListener stream_close_listener) {
+ stream_close_listener_ = std::move(stream_close_listener);
+ }
+
private:
struct QUICHE_EXPORT_PRIVATE StreamInfo {
bool before_sent_headers = false;
@@ -89,6 +97,8 @@
StreamInfoMap stream_map_;
+ StreamCloseListener stream_close_listener_;
+
Perspective perspective_;
nghttp2_session_callbacks_unique_ptr callbacks_;
void* user_data_;
diff --git a/http2/adapter/callback_visitor_test.cc b/http2/adapter/callback_visitor_test.cc
index 8f83da5..fee6fc4 100644
--- a/http2/adapter/callback_visitor_test.cc
+++ b/http2/adapter/callback_visitor_test.cc
@@ -1,5 +1,7 @@
#include "http2/adapter/callback_visitor.h"
+#include "absl/container/flat_hash_map.h"
+#include "http2/adapter/http2_protocol.h"
#include "http2/adapter/mock_nghttp2_callbacks.h"
#include "http2/adapter/nghttp2_test_utils.h"
#include "http2/adapter/test_utils.h"
@@ -11,6 +13,9 @@
namespace {
using testing::_;
+using testing::IsEmpty;
+using testing::Pair;
+using testing::UnorderedElementsAre;
enum FrameType {
DATA,
@@ -78,6 +83,11 @@
testing::StrictMock<MockNghttp2Callbacks> callbacks;
CallbackVisitor visitor(Perspective::kClient,
*MockNghttp2Callbacks::GetCallbacks(), &callbacks);
+ absl::flat_hash_map<Http2StreamId, int> stream_close_counts;
+ visitor.set_stream_close_listener(
+ [&stream_close_counts](Http2StreamId stream_id) {
+ ++stream_close_counts[stream_id];
+ });
testing::InSequence seq;
@@ -131,12 +141,15 @@
EXPECT_CALL(callbacks, OnFrameRecv(IsHeaders(1, _, NGHTTP2_HCAT_HEADERS)));
visitor.OnEndHeadersForStream(1);
+ EXPECT_THAT(stream_close_counts, IsEmpty());
+
// RST_STREAM on stream 3
EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(3, RST_STREAM, 0)));
visitor.OnFrameHeader(3, 4, RST_STREAM, 0);
// No change in stream map size.
EXPECT_EQ(visitor.stream_map_size(), 1);
+ EXPECT_THAT(stream_close_counts, IsEmpty());
EXPECT_CALL(callbacks, OnFrameRecv(IsRstStream(3, NGHTTP2_INTERNAL_ERROR)));
visitor.OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR);
@@ -144,6 +157,8 @@
EXPECT_CALL(callbacks, OnStreamClose(3, NGHTTP2_INTERNAL_ERROR));
visitor.OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR);
+ EXPECT_THAT(stream_close_counts, UnorderedElementsAre(Pair(3, 1)));
+
// More stream close events
EXPECT_CALL(callbacks,
OnBeginFrame(HasFrameHeader(1, DATA, NGHTTP2_FLAG_END_STREAM)));
@@ -156,8 +171,10 @@
EXPECT_CALL(callbacks, OnStreamClose(1, NGHTTP2_NO_ERROR));
visitor.OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR);
- // Stream map is empty again.
+ // Stream map is empty again after both streams were closed.
EXPECT_EQ(visitor.stream_map_size(), 0);
+ EXPECT_THAT(stream_close_counts,
+ UnorderedElementsAre(Pair(3, 1), Pair(1, 1)));
EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(5, RST_STREAM, _)));
visitor.OnFrameHeader(5, 4, RST_STREAM, 0);
@@ -169,6 +186,8 @@
visitor.OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM);
EXPECT_EQ(visitor.stream_map_size(), 0);
+ EXPECT_THAT(stream_close_counts,
+ UnorderedElementsAre(Pair(3, 1), Pair(1, 1), Pair(5, 1)));
}
TEST(ClientCallbackVisitorUnitTest, HeadersWithContinuation) {
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index a6ba0b2..c8dbc74 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -220,7 +220,6 @@
int32_t stream_id =
nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
nvs.size(), provider.get(), stream_user_data);
- // TODO(birenroy): clean up data source on stream close
sources_.emplace(stream_id, std::move(data_source));
QUICHE_VLOG(1) << "Submitted request with " << nvs.size()
<< " request headers and user data " << stream_user_data
@@ -235,7 +234,6 @@
std::unique_ptr<nghttp2_data_provider> provider =
MakeDataProvider(data_source.get());
- // TODO(birenroy): clean up data source on stream close
sources_.emplace(stream_id, std::move(data_source));
int result = nghttp2_submit_response(session_->raw_ptr(), stream_id,
@@ -269,6 +267,10 @@
return 0 == nghttp2_session_resume_data(session_->raw_ptr(), stream_id);
}
+void NgHttp2Adapter::RemoveStream(Http2StreamId stream_id) {
+ sources_.erase(stream_id);
+}
+
NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor,
Perspective perspective,
const nghttp2_option* options)
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index 9078278..257f01a 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -88,6 +88,12 @@
bool ResumeStream(Http2StreamId stream_id) override;
+ // Removes references to the `stream_id` from this adapter.
+ void RemoveStream(Http2StreamId stream_id);
+
+ // Accessor for testing.
+ size_t sources_size() const { return sources_.size(); }
+
private:
NgHttp2Adapter(Http2VisitorInterface& visitor, Perspective perspective,
const nghttp2_option* options);
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index a90e7ed..d238fd2 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -143,6 +143,8 @@
adapter->SetStreamUserData(stream_id2, const_cast<char*>(kSentinel2));
adapter->SetStreamUserData(stream_id3, nullptr);
+ EXPECT_EQ(adapter->sources_size(), 3);
+
EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x5));
@@ -205,7 +207,11 @@
EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
EXPECT_CALL(visitor, OnFrameHeader(3, 4, RST_STREAM, 0));
EXPECT_CALL(visitor, OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR));
- EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR));
+ EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR))
+ .WillOnce(
+ [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+ adapter->RemoveStream(stream_id);
+ });
EXPECT_CALL(visitor, OnFrameHeader(0, 19, GOAWAY, 0));
EXPECT_CALL(visitor,
OnGoAway(5, Http2ErrorCode::ENHANCE_YOUR_CALM, "calm down!!"));
@@ -221,6 +227,9 @@
EXPECT_EQ(kInitialFlowControlWindowSize,
adapter->GetStreamReceiveWindowSize(stream_id3));
+ // One stream was closed.
+ EXPECT_EQ(adapter->sources_size(), 2);
+
// Connection window should be the same as the first stream.
EXPECT_EQ(adapter->GetReceiveWindowSize(),
adapter->GetStreamReceiveWindowSize(stream_id1));
@@ -241,10 +250,18 @@
EXPECT_CALL(visitor, OnFrameHeader(1, 0, DATA, 1));
EXPECT_CALL(visitor, OnBeginDataForStream(1, 0));
EXPECT_CALL(visitor, OnEndStream(1));
- EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+ EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR))
+ .WillOnce(
+ [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+ adapter->RemoveStream(stream_id);
+ });
EXPECT_CALL(visitor, OnFrameHeader(5, 4, RST_STREAM, 0));
EXPECT_CALL(visitor, OnRstStream(5, Http2ErrorCode::REFUSED_STREAM));
- EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM));
+ EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM))
+ .WillOnce(
+ [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+ adapter->RemoveStream(stream_id);
+ });
adapter->ProcessBytes(TestFrameSequence()
.Data(1, "", true)
.RstStream(5, Http2ErrorCode::REFUSED_STREAM)
@@ -256,6 +273,7 @@
// After receiving END_STREAM for 1 and RST_STREAM for 5, the session no
// longer expects reads.
EXPECT_FALSE(adapter->want_read());
+ EXPECT_EQ(adapter->sources_size(), 0);
// Client will not have anything else to write.
EXPECT_FALSE(adapter->want_write());