Allow NgHttp2Adapter to remove `sources_` map entries via CallbackVisitor.

This CL introduces NgHttp2Adapter::RemoveStream() and
CallbackVisitor::set_stream_close_listener() that can be used together to
remove entries from the per-stream `sources_` map when a stream is closed.
Namely, CallbackVisitor::set_stream_close_listener() can be called with a
lambda that invokes NgHttp2Adapter::RemoveStream() in codec_impl.cc.

This CL has a positive effect on memory profiles in loadtests for addressing
go/envoy-upstream/issues/19761.

PiperOrigin-RevId: 437827219
diff --git a/http2/adapter/callback_visitor.cc b/http2/adapter/callback_visitor.cc
index fa52930..2a00eed 100644
--- a/http2/adapter/callback_visitor.cc
+++ b/http2/adapter/callback_visitor.cc
@@ -263,6 +263,9 @@
         nullptr, stream_id, static_cast<uint32_t>(error_code), user_data_);
   }
   stream_map_.erase(stream_id);
+  if (stream_close_listener_) {
+    stream_close_listener_(stream_id);
+  }
 }
 
 void CallbackVisitor::OnPriorityForStream(Http2StreamId /*stream_id*/,
diff --git a/http2/adapter/callback_visitor.h b/http2/adapter/callback_visitor.h
index d38e66f..a64c8e8 100644
--- a/http2/adapter/callback_visitor.h
+++ b/http2/adapter/callback_visitor.h
@@ -3,6 +3,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "absl/container/flat_hash_map.h"
@@ -18,6 +19,9 @@
 // data" pointer, and invokes the callbacks according to HTTP/2 events received.
 class QUICHE_EXPORT_PRIVATE CallbackVisitor : public Http2VisitorInterface {
  public:
+  // Called when the visitor receives a close event for `stream_id`.
+  using StreamCloseListener = std::function<void(Http2StreamId stream_id)>;
+
   explicit CallbackVisitor(Perspective perspective,
                            const nghttp2_session_callbacks& callbacks,
                            void* user_data);
@@ -71,6 +75,10 @@
 
   size_t stream_map_size() const { return stream_map_.size(); }
 
+  void set_stream_close_listener(StreamCloseListener stream_close_listener) {
+    stream_close_listener_ = std::move(stream_close_listener);
+  }
+
  private:
   struct QUICHE_EXPORT_PRIVATE StreamInfo {
     bool before_sent_headers = false;
@@ -89,6 +97,8 @@
 
   StreamInfoMap stream_map_;
 
+  StreamCloseListener stream_close_listener_;
+
   Perspective perspective_;
   nghttp2_session_callbacks_unique_ptr callbacks_;
   void* user_data_;
diff --git a/http2/adapter/callback_visitor_test.cc b/http2/adapter/callback_visitor_test.cc
index 8f83da5..fee6fc4 100644
--- a/http2/adapter/callback_visitor_test.cc
+++ b/http2/adapter/callback_visitor_test.cc
@@ -1,5 +1,7 @@
 #include "http2/adapter/callback_visitor.h"
 
+#include "absl/container/flat_hash_map.h"
+#include "http2/adapter/http2_protocol.h"
 #include "http2/adapter/mock_nghttp2_callbacks.h"
 #include "http2/adapter/nghttp2_test_utils.h"
 #include "http2/adapter/test_utils.h"
@@ -11,6 +13,9 @@
 namespace {
 
 using testing::_;
+using testing::IsEmpty;
+using testing::Pair;
+using testing::UnorderedElementsAre;
 
 enum FrameType {
   DATA,
@@ -78,6 +83,11 @@
   testing::StrictMock<MockNghttp2Callbacks> callbacks;
   CallbackVisitor visitor(Perspective::kClient,
                           *MockNghttp2Callbacks::GetCallbacks(), &callbacks);
+  absl::flat_hash_map<Http2StreamId, int> stream_close_counts;
+  visitor.set_stream_close_listener(
+      [&stream_close_counts](Http2StreamId stream_id) {
+        ++stream_close_counts[stream_id];
+      });
 
   testing::InSequence seq;
 
@@ -131,12 +141,15 @@
   EXPECT_CALL(callbacks, OnFrameRecv(IsHeaders(1, _, NGHTTP2_HCAT_HEADERS)));
   visitor.OnEndHeadersForStream(1);
 
+  EXPECT_THAT(stream_close_counts, IsEmpty());
+
   // RST_STREAM on stream 3
   EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(3, RST_STREAM, 0)));
   visitor.OnFrameHeader(3, 4, RST_STREAM, 0);
 
   // No change in stream map size.
   EXPECT_EQ(visitor.stream_map_size(), 1);
+  EXPECT_THAT(stream_close_counts, IsEmpty());
 
   EXPECT_CALL(callbacks, OnFrameRecv(IsRstStream(3, NGHTTP2_INTERNAL_ERROR)));
   visitor.OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR);
@@ -144,6 +157,8 @@
   EXPECT_CALL(callbacks, OnStreamClose(3, NGHTTP2_INTERNAL_ERROR));
   visitor.OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR);
 
+  EXPECT_THAT(stream_close_counts, UnorderedElementsAre(Pair(3, 1)));
+
   // More stream close events
   EXPECT_CALL(callbacks,
               OnBeginFrame(HasFrameHeader(1, DATA, NGHTTP2_FLAG_END_STREAM)));
@@ -156,8 +171,10 @@
   EXPECT_CALL(callbacks, OnStreamClose(1, NGHTTP2_NO_ERROR));
   visitor.OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR);
 
-  // Stream map is empty again.
+  // Stream map is empty again after both streams were closed.
   EXPECT_EQ(visitor.stream_map_size(), 0);
+  EXPECT_THAT(stream_close_counts,
+              UnorderedElementsAre(Pair(3, 1), Pair(1, 1)));
 
   EXPECT_CALL(callbacks, OnBeginFrame(HasFrameHeader(5, RST_STREAM, _)));
   visitor.OnFrameHeader(5, 4, RST_STREAM, 0);
@@ -169,6 +186,8 @@
   visitor.OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM);
 
   EXPECT_EQ(visitor.stream_map_size(), 0);
+  EXPECT_THAT(stream_close_counts,
+              UnorderedElementsAre(Pair(3, 1), Pair(1, 1), Pair(5, 1)));
 }
 
 TEST(ClientCallbackVisitorUnitTest, HeadersWithContinuation) {
diff --git a/http2/adapter/nghttp2_adapter.cc b/http2/adapter/nghttp2_adapter.cc
index a6ba0b2..c8dbc74 100644
--- a/http2/adapter/nghttp2_adapter.cc
+++ b/http2/adapter/nghttp2_adapter.cc
@@ -220,7 +220,6 @@
   int32_t stream_id =
       nghttp2_submit_request(session_->raw_ptr(), nullptr, nvs.data(),
                              nvs.size(), provider.get(), stream_user_data);
-  // TODO(birenroy): clean up data source on stream close
   sources_.emplace(stream_id, std::move(data_source));
   QUICHE_VLOG(1) << "Submitted request with " << nvs.size()
                  << " request headers and user data " << stream_user_data
@@ -235,7 +234,6 @@
   std::unique_ptr<nghttp2_data_provider> provider =
       MakeDataProvider(data_source.get());
 
-  // TODO(birenroy): clean up data source on stream close
   sources_.emplace(stream_id, std::move(data_source));
 
   int result = nghttp2_submit_response(session_->raw_ptr(), stream_id,
@@ -269,6 +267,10 @@
   return 0 == nghttp2_session_resume_data(session_->raw_ptr(), stream_id);
 }
 
+void NgHttp2Adapter::RemoveStream(Http2StreamId stream_id) {
+  sources_.erase(stream_id);
+}
+
 NgHttp2Adapter::NgHttp2Adapter(Http2VisitorInterface& visitor,
                                Perspective perspective,
                                const nghttp2_option* options)
diff --git a/http2/adapter/nghttp2_adapter.h b/http2/adapter/nghttp2_adapter.h
index 9078278..257f01a 100644
--- a/http2/adapter/nghttp2_adapter.h
+++ b/http2/adapter/nghttp2_adapter.h
@@ -88,6 +88,12 @@
 
   bool ResumeStream(Http2StreamId stream_id) override;
 
+  // Removes references to the `stream_id` from this adapter.
+  void RemoveStream(Http2StreamId stream_id);
+
+  // Accessor for testing.
+  size_t sources_size() const { return sources_.size(); }
+
  private:
   NgHttp2Adapter(Http2VisitorInterface& visitor, Perspective perspective,
                  const nghttp2_option* options);
diff --git a/http2/adapter/nghttp2_adapter_test.cc b/http2/adapter/nghttp2_adapter_test.cc
index a90e7ed..d238fd2 100644
--- a/http2/adapter/nghttp2_adapter_test.cc
+++ b/http2/adapter/nghttp2_adapter_test.cc
@@ -143,6 +143,8 @@
   adapter->SetStreamUserData(stream_id2, const_cast<char*>(kSentinel2));
   adapter->SetStreamUserData(stream_id3, nullptr);
 
+  EXPECT_EQ(adapter->sources_size(), 3);
+
   EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id1, _, 0x5));
   EXPECT_CALL(visitor, OnFrameSent(HEADERS, stream_id1, _, 0x5, 0));
   EXPECT_CALL(visitor, OnBeforeFrameSent(HEADERS, stream_id2, _, 0x5));
@@ -205,7 +207,11 @@
   EXPECT_CALL(visitor, OnDataForStream(1, "This is the response body."));
   EXPECT_CALL(visitor, OnFrameHeader(3, 4, RST_STREAM, 0));
   EXPECT_CALL(visitor, OnRstStream(3, Http2ErrorCode::INTERNAL_ERROR));
-  EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR));
+  EXPECT_CALL(visitor, OnCloseStream(3, Http2ErrorCode::INTERNAL_ERROR))
+      .WillOnce(
+          [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+            adapter->RemoveStream(stream_id);
+          });
   EXPECT_CALL(visitor, OnFrameHeader(0, 19, GOAWAY, 0));
   EXPECT_CALL(visitor,
               OnGoAway(5, Http2ErrorCode::ENHANCE_YOUR_CALM, "calm down!!"));
@@ -221,6 +227,9 @@
   EXPECT_EQ(kInitialFlowControlWindowSize,
             adapter->GetStreamReceiveWindowSize(stream_id3));
 
+  // One stream was closed.
+  EXPECT_EQ(adapter->sources_size(), 2);
+
   // Connection window should be the same as the first stream.
   EXPECT_EQ(adapter->GetReceiveWindowSize(),
             adapter->GetStreamReceiveWindowSize(stream_id1));
@@ -241,10 +250,18 @@
   EXPECT_CALL(visitor, OnFrameHeader(1, 0, DATA, 1));
   EXPECT_CALL(visitor, OnBeginDataForStream(1, 0));
   EXPECT_CALL(visitor, OnEndStream(1));
-  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR))
+      .WillOnce(
+          [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+            adapter->RemoveStream(stream_id);
+          });
   EXPECT_CALL(visitor, OnFrameHeader(5, 4, RST_STREAM, 0));
   EXPECT_CALL(visitor, OnRstStream(5, Http2ErrorCode::REFUSED_STREAM));
-  EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM));
+  EXPECT_CALL(visitor, OnCloseStream(5, Http2ErrorCode::REFUSED_STREAM))
+      .WillOnce(
+          [&adapter](Http2StreamId stream_id, Http2ErrorCode /*error_code*/) {
+            adapter->RemoveStream(stream_id);
+          });
   adapter->ProcessBytes(TestFrameSequence()
                             .Data(1, "", true)
                             .RstStream(5, Http2ErrorCode::REFUSED_STREAM)
@@ -256,6 +273,7 @@
   // After receiving END_STREAM for 1 and RST_STREAM for 5, the session no
   // longer expects reads.
   EXPECT_FALSE(adapter->want_read());
+  EXPECT_EQ(adapter->sources_size(), 0);
 
   // Client will not have anything else to write.
   EXPECT_FALSE(adapter->want_write());