gfe-relnote: Call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list.

All existing tests passed.

Protected by FLAGS_quic_reloadable_flag_quic_notify_ack_listener_earlier.

PiperOrigin-RevId: 700511871
diff --git a/quiche/common/quiche_feature_flags_list.h b/quiche/common/quiche_feature_flags_list.h
index 01104c7..91dc462 100755
--- a/quiche/common/quiche_feature_flags_list.h
+++ b/quiche/common/quiche_feature_flags_list.h
@@ -45,6 +45,7 @@
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true, true, "If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close, false, true, "If trrue, early return before write control frame in OnCanWrite() if the connection is already closed.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close2, false, false, "If true, QuicSession will block outgoing control frames when the connection is closed.")
+QUICHE_FLAG(bool, quiche_reloadable_flag_quic_notify_ack_listener_earlier, false, false, "If true, call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_optimize_qpack_blocking_manager, false, false, "If true, optimize qpack_blocking_manager for CPU efficiency.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_pacing_remove_non_initial_burst, false, false, "If true, remove the non-initial burst in QUIC PacingSender.")
 QUICHE_FLAG(bool, quiche_reloadable_flag_quic_parse_cert_compression_algos_from_chlo, true, true, "If true, parse offered cert compression algorithms from received CHLOs.")
diff --git a/quiche/quic/core/http/quic_spdy_stream.cc b/quiche/quic/core/http/quic_spdy_stream.cc
index ee9be7b..ba0d33c 100644
--- a/quiche/quic/core/http/quic_spdy_stream.cc
+++ b/quiche/quic/core/http/quic_spdy_stream.cc
@@ -26,6 +26,7 @@
 #include "quiche/quic/core/qpack/qpack_decoder.h"
 #include "quiche/quic/core/qpack/qpack_encoder.h"
 #include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_stream.h"
 #include "quiche/quic/core/quic_stream_priority.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/quic_utils.h"
@@ -1036,6 +1037,7 @@
   return true;
 }
 
+// TODO(danzh): Remove this override once the flag is deprecated.
 bool QuicSpdyStream::OnStreamFrameAcked(QuicStreamOffset offset,
                                         QuicByteCount data_length,
                                         bool fin_acked,
@@ -1046,17 +1048,38 @@
       offset, data_length, fin_acked, ack_delay_time, receive_timestamp,
       newly_acked_length);
 
-  const QuicByteCount newly_acked_header_length =
-      GetNumFrameHeadersInInterval(offset, data_length);
-  QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
-  unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
-  if (ack_listener_ != nullptr && new_data_acked) {
-    ack_listener_->OnPacketAcked(
-        *newly_acked_length - newly_acked_header_length, ack_delay_time);
+  if (!notify_ack_listener_earlier()) {
+    const QuicByteCount newly_acked_header_length =
+        GetNumFrameHeadersInInterval(offset, data_length);
+    QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
+    unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
+    if (ack_listener_ != nullptr && new_data_acked) {
+      ack_listener_->OnPacketAcked(
+          *newly_acked_length - newly_acked_header_length, ack_delay_time);
+    }
+  } else {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 2, 3);
   }
   return new_data_acked;
 }
 
+void QuicSpdyStream::OnNewDataAcked(QuicStreamOffset offset,
+                                    QuicByteCount data_length,
+                                    QuicByteCount newly_acked_length,
+                                    QuicTime receive_timestamp,
+                                    QuicTime::Delta ack_delay_time) {
+  QuicStream::OnNewDataAcked(offset, data_length, newly_acked_length,
+                             receive_timestamp, ack_delay_time);
+  const QuicByteCount newly_acked_header_length =
+      GetNumFrameHeadersInInterval(offset, data_length);
+  QUICHE_DCHECK_LE(newly_acked_header_length, newly_acked_length);
+  unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
+  if (ack_listener_ != nullptr) {
+    ack_listener_->OnPacketAcked(newly_acked_length - newly_acked_header_length,
+                                 ack_delay_time);
+  }
+}
+
 void QuicSpdyStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
                                                 QuicByteCount data_length,
                                                 bool fin_retransmitted) {
diff --git a/quiche/quic/core/http/quic_spdy_stream.h b/quiche/quic/core/http/quic_spdy_stream.h
index fb6dc2c..21bf3e9 100644
--- a/quiche/quic/core/http/quic_spdy_stream.h
+++ b/quiche/quic/core/http/quic_spdy_stream.h
@@ -406,6 +406,12 @@
 
   void CloseReadSide() override;
 
+  // Called when any new data is acked.
+  void OnNewDataAcked(QuicStreamOffset offset, QuicByteCount data_length,
+                      QuicByteCount newly_acked_length,
+                      QuicTime receive_timestamp,
+                      QuicTime::Delta ack_delay_time) override;
+
  private:
   friend class test::QuicSpdyStreamPeer;
   friend class test::QuicStreamPeer;
diff --git a/quiche/quic/core/http/quic_spdy_stream_test.cc b/quiche/quic/core/http/quic_spdy_stream_test.cc
index 05ca93e..7512c01 100644
--- a/quiche/quic/core/http/quic_spdy_stream_test.cc
+++ b/quiche/quic/core/http/quic_spdy_stream_test.cc
@@ -1902,6 +1902,66 @@
   EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
 }
 
+TEST_P(QuicSpdyStreamTest, OnPacketAckedBeforeStreamDestroy) {
+  Initialize(kShouldProcessData);
+  quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener(
+      new StrictMock<MockAckListener>);
+  stream_->set_ack_listener(mock_ack_listener);
+  EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(AtLeast(1));
+  // Stream is not waiting for acks initially.
+  EXPECT_FALSE(stream_->IsWaitingForAcks());
+  EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+  // Receive and consume initial headers with FIN set.
+  QuicHeaderList headers = ProcessHeaders(true, headers_);
+  stream_->ConsumeHeaderList();
+  stream_->OnFinRead();
+  EXPECT_TRUE(stream_->read_side_closed());
+
+  // Send kData1.
+  stream_->WriteOrBufferData("FooAndBar", false, nullptr);
+  EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
+  EXPECT_TRUE(stream_->IsWaitingForAcks());
+  EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _));
+  QuicByteCount newly_acked_length = 0;
+  EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
+                                          QuicTime::Zero(),
+                                          &newly_acked_length));
+  // Stream is not waiting for acks as all sent data is acked.
+  EXPECT_FALSE(stream_->IsWaitingForAcks());
+  EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+
+  // Send kData2.
+  stream_->WriteOrBufferData("FooAndBar", true, nullptr);
+  EXPECT_TRUE(stream_->IsWaitingForAcks());
+  EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
+  EXPECT_TRUE(stream_->IsZombie());
+
+  // kData2 is acked.
+  EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _));
+  EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
+                                          QuicTime::Zero(),
+                                          &newly_acked_length));
+  // Stream is waiting for acks as FIN is not acked.
+  EXPECT_TRUE(stream_->IsWaitingForAcks());
+  EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+
+  // FIN is acked.
+  EXPECT_CALL(*mock_ack_listener, OnPacketAcked(0, _))
+      .WillOnce(InvokeWithoutArgs([&]() {
+        if (GetQuicReloadableFlag(quic_notify_ack_listener_earlier)) {
+          // Stream is not added to closed stream list yet.
+          EXPECT_NE(session_->GetActiveStream(stream_->id()), nullptr);
+        } else {
+          EXPECT_EQ(session_->GetActiveStream(stream_->id()), nullptr);
+        }
+      }));
+  EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(),
+                                          QuicTime::Zero(),
+                                          &newly_acked_length));
+  EXPECT_FALSE(stream_->IsWaitingForAcks());
+  EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+}
+
 TEST_P(QuicSpdyStreamTest, StreamDataGetAckedMultipleTimes) {
   Initialize(kShouldProcessData);
   quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener(
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index f42faa1..dbce695 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -1132,8 +1132,8 @@
 
 bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
                                     QuicByteCount data_length, bool fin_acked,
-                                    QuicTime::Delta /*ack_delay_time*/,
-                                    QuicTime /*receive_timestamp*/,
+                                    QuicTime::Delta ack_delay_time,
+                                    QuicTime receive_timestamp,
                                     QuicByteCount* newly_acked_length) {
   QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
                 << "[" << offset << ", " << offset + data_length << "]"
@@ -1160,12 +1160,25 @@
     OnWriteSideInDataRecvdState();
     write_side_data_recvd_state_notified_ = true;
   }
+  if (notify_ack_listener_earlier_ && new_data_acked) {
+    QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3);
+    OnNewDataAcked(offset, data_length, *newly_acked_length, receive_timestamp,
+                   ack_delay_time);
+  }
   if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
     session_->MaybeCloseZombieStream(id_);
   }
   return new_data_acked;
 }
 
+void QuicStream::OnNewDataAcked(QuicStreamOffset /*offset*/,
+                                QuicByteCount /*data_length*/,
+                                QuicByteCount /*newly_acked_length*/,
+                                QuicTime /*receive_timestamp*/,
+                                QuicTime::Delta /*ack_delay_time*/) {
+  QUICHE_DCHECK(notify_ack_listener_earlier_);
+}
+
 void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
                                             QuicByteCount data_length,
                                             bool fin_retransmitted) {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index b2b5e9c..4da8b7f 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -484,6 +484,13 @@
   // Does not send a FIN.  May cause the stream to be closed.
   virtual void CloseWriteSide();
 
+  // Called when any new data is acked.
+  virtual void OnNewDataAcked(QuicStreamOffset offset,
+                              QuicByteCount data_length,
+                              QuicByteCount newly_acked_length,
+                              QuicTime receive_timestamp,
+                              QuicTime::Delta ack_delay_time);
+
   void set_rst_received(bool rst_received) { rst_received_ = rst_received; }
   void set_stream_error(QuicResetStreamError error) { stream_error_ = error; }
 
@@ -510,6 +517,10 @@
   std::optional<QuicByteCount> GetSendWindow() const;
   std::optional<QuicByteCount> GetReceiveWindow() const;
 
+  bool notify_ack_listener_earlier() const {
+    return notify_ack_listener_earlier_;
+  }
+
  private:
   friend class test::QuicStreamPeer;
   friend class QuicStreamUtils;
@@ -643,6 +654,9 @@
   std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at_;
 
   Perspective perspective_;
+
+  const bool notify_ack_listener_earlier_ =
+      GetQuicReloadableFlag(quic_notify_ack_listener_earlier);
 };
 
 }  // namespace quic