gfe-relnote: Call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list.
All existing tests passed.
Protected by FLAGS_quic_reloadable_flag_quic_notify_ack_listener_earlier.
PiperOrigin-RevId: 700511871
diff --git a/quiche/common/quiche_feature_flags_list.h b/quiche/common/quiche_feature_flags_list.h
index 01104c7..91dc462 100755
--- a/quiche/common/quiche_feature_flags_list.h
+++ b/quiche/common/quiche_feature_flags_list.h
@@ -45,6 +45,7 @@
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true, true, "If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.")
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close, false, true, "If trrue, early return before write control frame in OnCanWrite() if the connection is already closed.")
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close2, false, false, "If true, QuicSession will block outgoing control frames when the connection is closed.")
+QUICHE_FLAG(bool, quiche_reloadable_flag_quic_notify_ack_listener_earlier, false, false, "If true, call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list.")
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_optimize_qpack_blocking_manager, false, false, "If true, optimize qpack_blocking_manager for CPU efficiency.")
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_pacing_remove_non_initial_burst, false, false, "If true, remove the non-initial burst in QUIC PacingSender.")
QUICHE_FLAG(bool, quiche_reloadable_flag_quic_parse_cert_compression_algos_from_chlo, true, true, "If true, parse offered cert compression algorithms from received CHLOs.")
diff --git a/quiche/quic/core/http/quic_spdy_stream.cc b/quiche/quic/core/http/quic_spdy_stream.cc
index ee9be7b..ba0d33c 100644
--- a/quiche/quic/core/http/quic_spdy_stream.cc
+++ b/quiche/quic/core/http/quic_spdy_stream.cc
@@ -26,6 +26,7 @@
#include "quiche/quic/core/qpack/qpack_decoder.h"
#include "quiche/quic/core/qpack/qpack_encoder.h"
#include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_stream.h"
#include "quiche/quic/core/quic_stream_priority.h"
#include "quiche/quic/core/quic_types.h"
#include "quiche/quic/core/quic_utils.h"
@@ -1036,6 +1037,7 @@
return true;
}
+// TODO(danzh): Remove this override once the flag is deprecated.
bool QuicSpdyStream::OnStreamFrameAcked(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin_acked,
@@ -1046,17 +1048,38 @@
offset, data_length, fin_acked, ack_delay_time, receive_timestamp,
newly_acked_length);
- const QuicByteCount newly_acked_header_length =
- GetNumFrameHeadersInInterval(offset, data_length);
- QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
- unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
- if (ack_listener_ != nullptr && new_data_acked) {
- ack_listener_->OnPacketAcked(
- *newly_acked_length - newly_acked_header_length, ack_delay_time);
+ if (!notify_ack_listener_earlier()) {
+ const QuicByteCount newly_acked_header_length =
+ GetNumFrameHeadersInInterval(offset, data_length);
+ QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length);
+ unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
+ if (ack_listener_ != nullptr && new_data_acked) {
+ ack_listener_->OnPacketAcked(
+ *newly_acked_length - newly_acked_header_length, ack_delay_time);
+ }
+ } else {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 2, 3);
}
return new_data_acked;
}
+void QuicSpdyStream::OnNewDataAcked(QuicStreamOffset offset,
+ QuicByteCount data_length,
+ QuicByteCount newly_acked_length,
+ QuicTime receive_timestamp,
+ QuicTime::Delta ack_delay_time) {
+ QuicStream::OnNewDataAcked(offset, data_length, newly_acked_length,
+ receive_timestamp, ack_delay_time);
+ const QuicByteCount newly_acked_header_length =
+ GetNumFrameHeadersInInterval(offset, data_length);
+ QUICHE_DCHECK_LE(newly_acked_header_length, newly_acked_length);
+ unacked_frame_headers_offsets_.Difference(offset, offset + data_length);
+ if (ack_listener_ != nullptr) {
+ ack_listener_->OnPacketAcked(newly_acked_length - newly_acked_header_length,
+ ack_delay_time);
+ }
+}
+
void QuicSpdyStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin_retransmitted) {
diff --git a/quiche/quic/core/http/quic_spdy_stream.h b/quiche/quic/core/http/quic_spdy_stream.h
index fb6dc2c..21bf3e9 100644
--- a/quiche/quic/core/http/quic_spdy_stream.h
+++ b/quiche/quic/core/http/quic_spdy_stream.h
@@ -406,6 +406,12 @@
void CloseReadSide() override;
+ // Called when any new data is acked.
+ void OnNewDataAcked(QuicStreamOffset offset, QuicByteCount data_length,
+ QuicByteCount newly_acked_length,
+ QuicTime receive_timestamp,
+ QuicTime::Delta ack_delay_time) override;
+
private:
friend class test::QuicSpdyStreamPeer;
friend class test::QuicStreamPeer;
diff --git a/quiche/quic/core/http/quic_spdy_stream_test.cc b/quiche/quic/core/http/quic_spdy_stream_test.cc
index 05ca93e..7512c01 100644
--- a/quiche/quic/core/http/quic_spdy_stream_test.cc
+++ b/quiche/quic/core/http/quic_spdy_stream_test.cc
@@ -1902,6 +1902,66 @@
EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
}
+TEST_P(QuicSpdyStreamTest, OnPacketAckedBeforeStreamDestroy) {
+ Initialize(kShouldProcessData);
+ quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener(
+ new StrictMock<MockAckListener>);
+ stream_->set_ack_listener(mock_ack_listener);
+ EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(AtLeast(1));
+ // Stream is not waiting for acks initially.
+ EXPECT_FALSE(stream_->IsWaitingForAcks());
+ EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+ // Receive and consume initial headers with FIN set.
+ QuicHeaderList headers = ProcessHeaders(true, headers_);
+ stream_->ConsumeHeaderList();
+ stream_->OnFinRead();
+ EXPECT_TRUE(stream_->read_side_closed());
+
+ // Send kData1.
+ stream_->WriteOrBufferData("FooAndBar", false, nullptr);
+ EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
+ EXPECT_TRUE(stream_->IsWaitingForAcks());
+ EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _));
+ QuicByteCount newly_acked_length = 0;
+ EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(),
+ &newly_acked_length));
+ // Stream is not waiting for acks as all sent data is acked.
+ EXPECT_FALSE(stream_->IsWaitingForAcks());
+ EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+
+ // Send kData2.
+ stream_->WriteOrBufferData("FooAndBar", true, nullptr);
+ EXPECT_TRUE(stream_->IsWaitingForAcks());
+ EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size());
+ EXPECT_TRUE(stream_->IsZombie());
+
+ // kData2 is acked.
+ EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _));
+ EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(),
+ QuicTime::Zero(),
+ &newly_acked_length));
+ // Stream is waiting for acks as FIN is not acked.
+ EXPECT_TRUE(stream_->IsWaitingForAcks());
+ EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+
+ // FIN is acked.
+ EXPECT_CALL(*mock_ack_listener, OnPacketAcked(0, _))
+ .WillOnce(InvokeWithoutArgs([&]() {
+ if (GetQuicReloadableFlag(quic_notify_ack_listener_earlier)) {
+ // Stream is not added to closed stream list yet.
+ EXPECT_NE(session_->GetActiveStream(stream_->id()), nullptr);
+ } else {
+ EXPECT_EQ(session_->GetActiveStream(stream_->id()), nullptr);
+ }
+ }));
+ EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(),
+ QuicTime::Zero(),
+ &newly_acked_length));
+ EXPECT_FALSE(stream_->IsWaitingForAcks());
+ EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size());
+}
+
TEST_P(QuicSpdyStreamTest, StreamDataGetAckedMultipleTimes) {
Initialize(kShouldProcessData);
quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener(
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc
index f42faa1..dbce695 100644
--- a/quiche/quic/core/quic_stream.cc
+++ b/quiche/quic/core/quic_stream.cc
@@ -1132,8 +1132,8 @@
bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
QuicByteCount data_length, bool fin_acked,
- QuicTime::Delta /*ack_delay_time*/,
- QuicTime /*receive_timestamp*/,
+ QuicTime::Delta ack_delay_time,
+ QuicTime receive_timestamp,
QuicByteCount* newly_acked_length) {
QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
<< "[" << offset << ", " << offset + data_length << "]"
@@ -1160,12 +1160,25 @@
OnWriteSideInDataRecvdState();
write_side_data_recvd_state_notified_ = true;
}
+ if (notify_ack_listener_earlier_ && new_data_acked) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3);
+ OnNewDataAcked(offset, data_length, *newly_acked_length, receive_timestamp,
+ ack_delay_time);
+ }
if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) {
session_->MaybeCloseZombieStream(id_);
}
return new_data_acked;
}
+void QuicStream::OnNewDataAcked(QuicStreamOffset /*offset*/,
+ QuicByteCount /*data_length*/,
+ QuicByteCount /*newly_acked_length*/,
+ QuicTime /*receive_timestamp*/,
+ QuicTime::Delta /*ack_delay_time*/) {
+ QUICHE_DCHECK(notify_ack_listener_earlier_);
+}
+
void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
QuicByteCount data_length,
bool fin_retransmitted) {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h
index b2b5e9c..4da8b7f 100644
--- a/quiche/quic/core/quic_stream.h
+++ b/quiche/quic/core/quic_stream.h
@@ -484,6 +484,13 @@
// Does not send a FIN. May cause the stream to be closed.
virtual void CloseWriteSide();
+ // Called when any new data is acked.
+ virtual void OnNewDataAcked(QuicStreamOffset offset,
+ QuicByteCount data_length,
+ QuicByteCount newly_acked_length,
+ QuicTime receive_timestamp,
+ QuicTime::Delta ack_delay_time);
+
void set_rst_received(bool rst_received) { rst_received_ = rst_received; }
void set_stream_error(QuicResetStreamError error) { stream_error_ = error; }
@@ -510,6 +517,10 @@
std::optional<QuicByteCount> GetSendWindow() const;
std::optional<QuicByteCount> GetReceiveWindow() const;
+ bool notify_ack_listener_earlier() const {
+ return notify_ack_listener_earlier_;
+ }
+
private:
friend class test::QuicStreamPeer;
friend class QuicStreamUtils;
@@ -643,6 +654,9 @@
std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at_;
Perspective perspective_;
+
+ const bool notify_ack_listener_earlier_ =
+ GetQuicReloadableFlag(quic_notify_ack_listener_earlier);
};
} // namespace quic