Let OgHttp2Session avoid visitor callbacks for data on unknown streams.

This CL performs a stream map check in OgHttp2Session::OnDataFrameHeader() and
OgHttp2Session::OnStreamFrameData(). If the stream is not in the map, then
OgHttp2Session will avoid invoking visitor_.OnBeginDataForStream() and
visitor_.OnDataForStream(), respectively. This behavior is in line with
Http2Dispatcher behavior [1, 2] and brings oghttp2 more in line with nghttp2.
Similarly, this CL also moves visitor_.OnEndStream() into an already-existing
stream map check. This behavior also allows
Http2FloodMitigationTest.UpstreamRstStreamStormOnDownstreamCloseRegressionTest
to pass, as hypothesized in http://b/201799377#comment8:

blaze test //third_party/envoy/src/test/integration:http2_flood_integration_test  --test_filter=*/Http2FloodMitigationTest.UpstreamRstStreamStormOnDownstreamCloseRegressionTest/* --test_arg=--vmodule=oghttp2_session=2 --test_arg="--" --test_arg="-l trace" --test_env="ENVOY_NGHTTP2_TRACE="

Before: http://sponge2/3ea792e7-d946-413e-83f3-d20e1f92741f
After: http://sponge2/f43bb7d8-4374-46a4-8804-3e2be1719442

[1] http://google3/gfe/gfe2/http2/http2_dispatcher.cc;l=974-987;rcl=406920946
[2] http://google3/gfe/gfe2/http2/http2_dispatcher.cc;l=1024-1032;rcl=406920946

PiperOrigin-RevId: 410129890
diff --git a/http2/adapter/oghttp2_adapter_test.cc b/http2/adapter/oghttp2_adapter_test.cc
index f5a8cb9..efe0327 100644
--- a/http2/adapter/oghttp2_adapter_test.cc
+++ b/http2/adapter/oghttp2_adapter_test.cc
@@ -1251,16 +1251,9 @@
           .Serialize();
 
   // The visitor gets notified about the HEADERS frame and DATA frame for the
-  // closed stream with further processing on the DATA frame.
+  // closed stream with no further processing on either frame.
   EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, HEADERS, 0x4));
   EXPECT_CALL(visitor, OnFrameHeader(stream_id, _, DATA, 0x1));
-  EXPECT_CALL(visitor, OnBeginDataForStream(stream_id, _));
-  EXPECT_CALL(visitor,
-              OnDataForStream(stream_id, "This is the response body."));
-  EXPECT_CALL(visitor, OnEndStream(stream_id));
-  // This final OnCloseStream() is avoided because of a stream map check in
-  // OnEndStream().
-  EXPECT_CALL(visitor, OnCloseStream(stream_id, _)).Times(0);
 
   const int64_t response_result = adapter->ProcessBytes(response_frames);
   EXPECT_EQ(response_frames.size(), static_cast<size_t>(response_result));
diff --git a/http2/adapter/oghttp2_session.cc b/http2/adapter/oghttp2_session.cc
index a48dc47..c46f338 100644
--- a/http2/adapter/oghttp2_session.cc
+++ b/http2/adapter/oghttp2_session.cc
@@ -746,12 +746,17 @@
 
 void OgHttp2Session::OnDataFrameHeader(spdy::SpdyStreamId stream_id,
                                        size_t length, bool /*fin*/) {
-  if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
-    // Receiving DATA before HEADERS is a connection error.
-    LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
-                        ConnectionError::kWrongFrameSequence);
+  if (!stream_map_.contains(stream_id)) {
+    // The stream does not exist; it could be an error or a benign close, e.g.,
+    // getting data for a stream this connection recently closed.
+    if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
+      // Receiving DATA before HEADERS is a connection error.
+      LatchErrorAndNotify(Http2ErrorCode::PROTOCOL_ERROR,
+                          ConnectionError::kWrongFrameSequence);
+    }
     return;
   }
+
   const bool result = visitor_.OnBeginDataForStream(stream_id, length);
   if (!result) {
     decoder_.StopProcessing();
@@ -764,8 +769,8 @@
   // Count the data against flow control, even if the stream is unknown.
   MarkDataBuffered(stream_id, len);
 
-  if (static_cast<Http2StreamId>(stream_id) > highest_processed_stream_id_) {
-    // Receiving DATA before HEADERS is a connection error; the visitor was
+  if (!stream_map_.contains(stream_id)) {
+    // If the stream was unknown due to a protocol error, the visitor was
     // informed in OnDataFrameHeader().
     return;
   }
@@ -781,8 +786,8 @@
   auto iter = stream_map_.find(stream_id);
   if (iter != stream_map_.end()) {
     iter->second.half_closed_remote = true;
+    visitor_.OnEndStream(stream_id);
   }
-  visitor_.OnEndStream(stream_id);
   if (iter != stream_map_.end() && iter->second.half_closed_local &&
       options_.perspective == Perspective::kClient) {
     // From the client's perspective, the stream can be closed if it's already