Add upper bound flag FLAGS_quic_time_wait_list_max_pending_packets (default to 1K) for pending_queued_packets_ in time wait list.

Protected by FLAGS_quic_reloadable_flag_quic_add_upperbound_for_queued_packets.

PiperOrigin-RevId: 391175110
diff --git a/quic/core/quic_flags_list.h b/quic/core/quic_flags_list.h
index 1af71b6..0eedd30 100644
--- a/quic/core/quic_flags_list.h
+++ b/quic/core/quic_flags_list.h
@@ -97,6 +97,8 @@
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_require_handshake_confirmation, false)
 // If true, reset per packet state before processing undecryptable packets.
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_reset_per_packet_state_for_undecryptable_packets, true)
+// If true, respect FLAGS_quic_time_wait_list_max_pending_packets as the upper bound of queued packets in time wait list.
+QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_add_upperbound_for_queued_packets, true)
 // If true, send PATH_RESPONSE upon receiving PATH_CHALLENGE regardless of perspective. --gfe2_reloadable_flag_quic_start_peer_migration_earlier has to be true before turn on this flag.
 QUIC_FLAG(FLAGS_quic_reloadable_flag_quic_send_path_response2, true)
 // If true, set burst token to 2 in cwnd bootstrapping experiment.
diff --git a/quic/core/quic_protocol_flags_list.h b/quic/core/quic_protocol_flags_list.h
index 56f8359..62c1712 100644
--- a/quic/core/quic_protocol_flags_list.h
+++ b/quic/core/quic_protocol_flags_list.h
@@ -44,6 +44,12 @@
                    "Time period for which a given connection_id should live in "
                    "the time-wait state.")
 
+// This number is relatively conservative. For example, there are at most 1K
+// queued stateless resets, which consume 1K * 21B = 21KB.
+QUIC_PROTOCOL_FLAG(
+    uint64_t, quic_time_wait_list_max_pending_packets, 1024,
+    "Upper limit of pending packets in time wait list when writer is blocked.")
+
 QUIC_PROTOCOL_FLAG(double,
                    quic_bbr_cwnd_gain,
                    2.0f,
diff --git a/quic/core/quic_time_wait_list_manager.cc b/quic/core/quic_time_wait_list_manager.cc
index 12f5ef9..69b85fa 100644
--- a/quic/core/quic_time_wait_list_manager.cc
+++ b/quic/core/quic_time_wait_list_manager.cc
@@ -392,6 +392,13 @@
     QUIC_LOG(ERROR) << "Tried to send or queue a null packet";
     return true;
   }
+  if (GetQuicReloadableFlag(quic_add_upperbound_for_queued_packets) &&
+      pending_packets_queue_.size() >=
+          GetQuicFlag(FLAGS_quic_time_wait_list_max_pending_packets)) {
+    // There are too many pending packets.
+    QUIC_CODE_COUNT(quic_too_many_pending_packets_in_time_wait);
+    return true;
+  }
   if (WriteToWire(packet.get())) {
     // Allow the packet to be deleted upon leaving this function.
     return true;
diff --git a/quic/core/quic_time_wait_list_manager_test.cc b/quic/core/quic_time_wait_list_manager_test.cc
index f47bdf2..49e8b27 100644
--- a/quic/core/quic_time_wait_list_manager_test.cc
+++ b/quic/core/quic_time_wait_list_manager_test.cc
@@ -768,6 +768,37 @@
                                                  nullptr, nullptr);
 }
 
+TEST_F(QuicTimeWaitListManagerTest, TooManyPendingPackets) {
+  SetQuicFlag(FLAGS_quic_time_wait_list_max_pending_packets, 5);
+  const size_t kNumOfUnProcessablePackets = 2048;
+  EXPECT_CALL(visitor_, OnWriteBlocked(&time_wait_list_manager_))
+      .Times(testing::AnyNumber());
+  // Write block for the next packets.
+  EXPECT_CALL(writer_,
+              WritePacket(_, _, self_address_.host(), peer_address_, _))
+      .With(Args<0, 1>(PublicResetPacketEq(TestConnectionId(1))))
+      .WillOnce(DoAll(Assign(&writer_is_blocked_, true),
+                      Return(WriteResult(WRITE_STATUS_BLOCKED, EAGAIN))));
+  for (size_t i = 0; i < kNumOfUnProcessablePackets; ++i) {
+    time_wait_list_manager_.SendPublicReset(
+        self_address_, peer_address_, TestConnectionId(1),
+        /*ietf_quic=*/true,
+        /*received_packet_length=*/
+        QuicFramer::GetMinStatelessResetPacketLength() + 1,
+        /*packet_context=*/nullptr);
+  }
+  if (GetQuicReloadableFlag(quic_add_upperbound_for_queued_packets)) {
+    // Verify pending packet queue size is limited.
+    EXPECT_EQ(5u, QuicTimeWaitListManagerPeer::PendingPacketsQueueSize(
+                      &time_wait_list_manager_));
+  } else {
+    // The pending packet queue grows unbounded.
+    EXPECT_EQ(kNumOfUnProcessablePackets,
+              QuicTimeWaitListManagerPeer::PendingPacketsQueueSize(
+                  &time_wait_list_manager_));
+  }
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_time_wait_list_manager_peer.cc b/quic/test_tools/quic_time_wait_list_manager_peer.cc
index 8f614e8..cbcd36e 100644
--- a/quic/test_tools/quic_time_wait_list_manager_peer.cc
+++ b/quic/test_tools/quic_time_wait_list_manager_peer.cc
@@ -36,5 +36,11 @@
   return manager->SendOrQueuePacket(std::move(packet), packet_context);
 }
 
+// static
+size_t QuicTimeWaitListManagerPeer::PendingPacketsQueueSize(
+    QuicTimeWaitListManager* manager) {
+  return manager->pending_packets_queue_.size();
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_time_wait_list_manager_peer.h b/quic/test_tools/quic_time_wait_list_manager_peer.h
index 4e0a4a7..a314e03 100644
--- a/quic/test_tools/quic_time_wait_list_manager_peer.h
+++ b/quic/test_tools/quic_time_wait_list_manager_peer.h
@@ -26,6 +26,8 @@
       QuicTimeWaitListManager* manager,
       std::unique_ptr<QuicTimeWaitListManager::QueuedPacket> packet,
       const QuicPerPacketContext* packet_context);
+
+  static size_t PendingPacketsQueueSize(QuicTimeWaitListManager* manager);
 };
 
 }  // namespace test