Patch a fix from https://github.com/villainb-dg for the issue of leaking connection flow control window when a HTTP2 stream receives a DATA frame after being closed (reset, etc).

When a stream receives a DATA frame while it is already closed, the data is counted against the connection flow control window, but is never marked as consumed.
Fix: Mark the data as consumed when the received DATA frame is on a reset or invalid stream.

Reported QUICHE issue https://github.com/google/quiche/issues/91
Reported Envoy issue https://github.com/envoyproxy/envoy/issues/40085
Proposed external fix: https://github.com/google/quiche/pull/92

PiperOrigin-RevId: 781064590
diff --git a/quiche/http2/adapter/oghttp2_session.cc b/quiche/http2/adapter/oghttp2_session.cc
index c25b291..9b1afb2 100644
--- a/quiche/http2/adapter/oghttp2_session.cc
+++ b/quiche/http2/adapter/oghttp2_session.cc
@@ -1152,11 +1152,15 @@
 
 void OgHttp2Session::OnStreamFrameData(spdy::SpdyStreamId stream_id,
                                        const char* data, size_t len) {
-  // Count the data against flow control, even if the stream is unknown.
+  // Count the data against flow control, even if the stream is unknown, so that
+  // the connection flow control window is in sync with peer's.
   MarkDataBuffered(stream_id, len);
 
   auto iter = stream_map_.find(stream_id);
   if (iter == stream_map_.end()) {
+    // Mark the data consumed immediately as we are dropping them. This will
+    // allow the connection flow control window to shift.
+    Consume(stream_id, len);
     return;
   }
   // Validate against the content-length if it exists.
@@ -1171,6 +1175,9 @@
   if (streams_reset_.contains(stream_id)) {
     // If the stream was unknown due to a protocol error, the visitor was
     // informed in OnDataFrameHeader().
+    // Mark the data consumed immediately as we are dropping them. This will
+    // allow the connection flow control window to shift.
+    Consume(stream_id, len);
     return;
   }
 
diff --git a/quiche/http2/adapter/oghttp2_session_test.cc b/quiche/http2/adapter/oghttp2_session_test.cc
index 34a387b..4afcf1a 100644
--- a/quiche/http2/adapter/oghttp2_session_test.cc
+++ b/quiche/http2/adapter/oghttp2_session_test.cc
@@ -1217,6 +1217,175 @@
   EXPECT_EQ(result, frames.size());
 }
 
+TEST(OgHttp2SessionTest, ResetStreamRaceWithIncomingData) {
+  TestVisitor visitor;
+  OgHttp2Session::Options options;
+  options.perspective = Perspective::kServer;
+  OgHttp2Session session(visitor, options);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/"}},
+                                          /*fin=*/false)
+                                 .Data(1, "Request body", false)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0x0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+  EXPECT_CALL(visitor, OnDataForStream(1, "Request body"))
+      .WillOnce(testing::InvokeWithoutArgs([&session]() {
+        session.Consume(1, 12);
+        return true;
+      }));
+
+  session.ProcessBytes(frames);
+
+  EXPECT_TRUE(session.want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  int result1 = session.Send();
+  EXPECT_EQ(0, result1);
+  absl::string_view serialized1 = visitor.data();
+  EXPECT_THAT(serialized1,
+              EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::SETTINGS}));
+  EXPECT_FALSE(session.want_write());
+
+  EXPECT_LT(session.GetReceiveWindowSize(), kInitialFlowControlWindowSize);
+
+  // Reset the stream and receive more data on this stream.
+  session.EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
+      1, spdy::ERROR_CODE_PROTOCOL_ERROR));
+  const std::string more_frames =
+      TestFrameSequence()
+          .Data(1, std::string(16 * 1024, 'x'), false)
+          .Data(1, std::string(16 * 1024, 'y'), false)
+          .Serialize();
+  // These bytes are counted against the connection flow control window but
+  // should be dropped right away and considerred as consumed.
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, _)).Times(0);
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _)).Times(0);
+  EXPECT_CALL(visitor, OnDataForStream(1, _)).Times(0);
+
+  session.ProcessBytes(more_frames);
+  EXPECT_TRUE(session.want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(RST_STREAM, 1, _, 0x0, 1));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(WINDOW_UPDATE, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(WINDOW_UPDATE, 0, _, 0x0, 0));
+  int result2 = session.Send();
+  EXPECT_EQ(0, result2);
+  absl::string_view serialized2 = visitor.data();
+  serialized2.remove_prefix(serialized1.size());
+  EXPECT_THAT(serialized2, EqualsFrames({SpdyFrameType::RST_STREAM,
+                                         SpdyFrameType::WINDOW_UPDATE}));
+  EXPECT_EQ(session.GetReceiveWindowSize(), kInitialFlowControlWindowSize);
+}
+
+TEST(OgHttp2SessionTest, ResetAndCloseStreamRaceWithIncomingData) {
+  TestVisitor visitor;
+  OgHttp2Session::Options options;
+  options.perspective = Perspective::kServer;
+  OgHttp2Session session(visitor, options);
+
+  const std::string frames = TestFrameSequence()
+                                 .ClientPreface()
+                                 .Headers(1,
+                                          {{":method", "POST"},
+                                           {":scheme", "https"},
+                                           {":authority", "example.com"},
+                                           {":path", "/"}},
+                                          /*fin=*/false)
+                                 .Data(1, "Request body", false)
+                                 .Serialize();
+  testing::InSequence s;
+
+  // Client preface (empty SETTINGS)
+  EXPECT_CALL(visitor, OnFrameHeader(0, 0, SETTINGS, 0));
+  EXPECT_CALL(visitor, OnSettingsStart());
+  EXPECT_CALL(visitor, OnSettingsEnd());
+  // Stream 1
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, HEADERS, 0x4));
+  EXPECT_CALL(visitor, OnBeginHeadersForStream(1));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":method", "POST"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":scheme", "https"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":authority", "example.com"));
+  EXPECT_CALL(visitor, OnHeaderForStream(1, ":path", "/"));
+  EXPECT_CALL(visitor, OnEndHeadersForStream(1));
+
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, 0x0));
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _));
+  EXPECT_CALL(visitor, OnDataForStream(1, "Request body"))
+      .WillOnce(testing::InvokeWithoutArgs([&session]() {
+        session.Consume(1, 12);
+        return true;
+      }));
+
+  session.ProcessBytes(frames);
+
+  EXPECT_TRUE(session.want_write());
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x0, 0));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(SETTINGS, 0, _, 0x1));
+  EXPECT_CALL(visitor, OnFrameSent(SETTINGS, 0, _, 0x1, 0));
+  int result1 = session.Send();
+  EXPECT_EQ(0, result1);
+  absl::string_view serialized1 = visitor.data();
+  EXPECT_THAT(serialized1,
+              EqualsFrames({SpdyFrameType::SETTINGS, SpdyFrameType::SETTINGS}));
+  EXPECT_FALSE(session.want_write());
+
+  EXPECT_LT(session.GetReceiveWindowSize(), kInitialFlowControlWindowSize);
+
+  // Reset the stream and receive more data on this stream.
+  session.EnqueueFrame(std::make_unique<spdy::SpdyRstStreamIR>(
+      1, spdy::ERROR_CODE_PROTOCOL_ERROR));
+  EXPECT_CALL(visitor, OnBeforeFrameSent(RST_STREAM, 1, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(RST_STREAM, 1, _, 0x0, 1));
+  EXPECT_CALL(visitor, OnCloseStream(1, Http2ErrorCode::HTTP2_NO_ERROR));
+  EXPECT_EQ(0, session.Send());
+
+  const std::string more_frames =
+      TestFrameSequence()
+          .Data(1, std::string(16 * 1024, 'x'), false)
+          .Data(1, std::string(16 * 1024, 'y'), false)
+          .Serialize();
+  // These bytes are counted against the connection flow control window but
+  // should be dropped right away and considered as consumed.
+  EXPECT_CALL(visitor, OnFrameHeader(1, _, DATA, _)).Times(2);
+  EXPECT_CALL(visitor, OnBeginDataForStream(1, _)).Times(0);
+  EXPECT_CALL(visitor, OnDataForStream(1, _)).Times(0);
+
+  session.ProcessBytes(more_frames);
+  EXPECT_TRUE(session.want_write());
+
+  EXPECT_CALL(visitor, OnBeforeFrameSent(WINDOW_UPDATE, 0, _, 0x0));
+  EXPECT_CALL(visitor, OnFrameSent(WINDOW_UPDATE, 0, _, 0x0, 0));
+  EXPECT_EQ(0, session.Send());
+  EXPECT_EQ(session.GetReceiveWindowSize(), kInitialFlowControlWindowSize);
+}
+
 }  // namespace test
 }  // namespace adapter
 }  // namespace http2