gfe-relnote: Call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list. All existing tests passed. Protected by FLAGS_quic_reloadable_flag_quic_notify_ack_listener_earlier. PiperOrigin-RevId: 700511871
diff --git a/quiche/common/quiche_feature_flags_list.h b/quiche/common/quiche_feature_flags_list.h index 01104c7..91dc462 100755 --- a/quiche/common/quiche_feature_flags_list.h +++ b/quiche/common/quiche_feature_flags_list.h
@@ -45,6 +45,7 @@ QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true, true, "If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close, false, true, "If trrue, early return before write control frame in OnCanWrite() if the connection is already closed.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_no_write_control_frame_upon_connection_close2, false, false, "If true, QuicSession will block outgoing control frames when the connection is closed.") +QUICHE_FLAG(bool, quiche_reloadable_flag_quic_notify_ack_listener_earlier, false, false, "If true, call QuicAckListenerInterface::OnPacketAcked() before moving the stream to closed stream list.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_optimize_qpack_blocking_manager, false, false, "If true, optimize qpack_blocking_manager for CPU efficiency.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_pacing_remove_non_initial_burst, false, false, "If true, remove the non-initial burst in QUIC PacingSender.") QUICHE_FLAG(bool, quiche_reloadable_flag_quic_parse_cert_compression_algos_from_chlo, true, true, "If true, parse offered cert compression algorithms from received CHLOs.")
diff --git a/quiche/quic/core/http/quic_spdy_stream.cc b/quiche/quic/core/http/quic_spdy_stream.cc index ee9be7b..ba0d33c 100644 --- a/quiche/quic/core/http/quic_spdy_stream.cc +++ b/quiche/quic/core/http/quic_spdy_stream.cc
@@ -26,6 +26,7 @@ #include "quiche/quic/core/qpack/qpack_decoder.h" #include "quiche/quic/core/qpack/qpack_encoder.h" #include "quiche/quic/core/quic_error_codes.h" +#include "quiche/quic/core/quic_stream.h" #include "quiche/quic/core/quic_stream_priority.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/quic_utils.h" @@ -1036,6 +1037,7 @@ return true; } +// TODO(danzh): Remove this override once the flag is deprecated. bool QuicSpdyStream::OnStreamFrameAcked(QuicStreamOffset offset, QuicByteCount data_length, bool fin_acked, @@ -1046,17 +1048,38 @@ offset, data_length, fin_acked, ack_delay_time, receive_timestamp, newly_acked_length); - const QuicByteCount newly_acked_header_length = - GetNumFrameHeadersInInterval(offset, data_length); - QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length); - unacked_frame_headers_offsets_.Difference(offset, offset + data_length); - if (ack_listener_ != nullptr && new_data_acked) { - ack_listener_->OnPacketAcked( - *newly_acked_length - newly_acked_header_length, ack_delay_time); + if (!notify_ack_listener_earlier()) { + const QuicByteCount newly_acked_header_length = + GetNumFrameHeadersInInterval(offset, data_length); + QUICHE_DCHECK_LE(newly_acked_header_length, *newly_acked_length); + unacked_frame_headers_offsets_.Difference(offset, offset + data_length); + if (ack_listener_ != nullptr && new_data_acked) { + ack_listener_->OnPacketAcked( + *newly_acked_length - newly_acked_header_length, ack_delay_time); + } + } else { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 2, 3); } return new_data_acked; } +void QuicSpdyStream::OnNewDataAcked(QuicStreamOffset offset, + QuicByteCount data_length, + QuicByteCount newly_acked_length, + QuicTime receive_timestamp, + QuicTime::Delta ack_delay_time) { + QuicStream::OnNewDataAcked(offset, data_length, newly_acked_length, + receive_timestamp, ack_delay_time); + const QuicByteCount newly_acked_header_length = + GetNumFrameHeadersInInterval(offset, data_length); + QUICHE_DCHECK_LE(newly_acked_header_length, newly_acked_length); + unacked_frame_headers_offsets_.Difference(offset, offset + data_length); + if (ack_listener_ != nullptr) { + ack_listener_->OnPacketAcked(newly_acked_length - newly_acked_header_length, + ack_delay_time); + } +} + void QuicSpdyStream::OnStreamFrameRetransmitted(QuicStreamOffset offset, QuicByteCount data_length, bool fin_retransmitted) {
diff --git a/quiche/quic/core/http/quic_spdy_stream.h b/quiche/quic/core/http/quic_spdy_stream.h index fb6dc2c..21bf3e9 100644 --- a/quiche/quic/core/http/quic_spdy_stream.h +++ b/quiche/quic/core/http/quic_spdy_stream.h
@@ -406,6 +406,12 @@ void CloseReadSide() override; + // Called when any new data is acked. + void OnNewDataAcked(QuicStreamOffset offset, QuicByteCount data_length, + QuicByteCount newly_acked_length, + QuicTime receive_timestamp, + QuicTime::Delta ack_delay_time) override; + private: friend class test::QuicSpdyStreamPeer; friend class test::QuicStreamPeer;
diff --git a/quiche/quic/core/http/quic_spdy_stream_test.cc b/quiche/quic/core/http/quic_spdy_stream_test.cc index 05ca93e..7512c01 100644 --- a/quiche/quic/core/http/quic_spdy_stream_test.cc +++ b/quiche/quic/core/http/quic_spdy_stream_test.cc
@@ -1902,6 +1902,66 @@ EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size()); } +TEST_P(QuicSpdyStreamTest, OnPacketAckedBeforeStreamDestroy) { + Initialize(kShouldProcessData); + quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener( + new StrictMock<MockAckListener>); + stream_->set_ack_listener(mock_ack_listener); + EXPECT_CALL(*session_, WritevData(_, _, _, _, _, _)).Times(AtLeast(1)); + // Stream is not waiting for acks initially. + EXPECT_FALSE(stream_->IsWaitingForAcks()); + EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size()); + // Receive and consume initial headers with FIN set. + QuicHeaderList headers = ProcessHeaders(true, headers_); + stream_->ConsumeHeaderList(); + stream_->OnFinRead(); + EXPECT_TRUE(stream_->read_side_closed()); + + // Send kData1. + stream_->WriteOrBufferData("FooAndBar", false, nullptr); + EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size()); + EXPECT_TRUE(stream_->IsWaitingForAcks()); + EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _)); + QuicByteCount newly_acked_length = 0; + EXPECT_TRUE(stream_->OnStreamFrameAcked(0, 9, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), + &newly_acked_length)); + // Stream is not waiting for acks as all sent data is acked. + EXPECT_FALSE(stream_->IsWaitingForAcks()); + EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size()); + + // Send kData2. + stream_->WriteOrBufferData("FooAndBar", true, nullptr); + EXPECT_TRUE(stream_->IsWaitingForAcks()); + EXPECT_EQ(1u, QuicStreamPeer::SendBuffer(stream_).size()); + EXPECT_TRUE(stream_->IsZombie()); + + // kData2 is acked. + EXPECT_CALL(*mock_ack_listener, OnPacketAcked(9, _)); + EXPECT_TRUE(stream_->OnStreamFrameAcked(9, 9, false, QuicTime::Delta::Zero(), + QuicTime::Zero(), + &newly_acked_length)); + // Stream is waiting for acks as FIN is not acked. + EXPECT_TRUE(stream_->IsWaitingForAcks()); + EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size()); + + // FIN is acked. + EXPECT_CALL(*mock_ack_listener, OnPacketAcked(0, _)) + .WillOnce(InvokeWithoutArgs([&]() { + if (GetQuicReloadableFlag(quic_notify_ack_listener_earlier)) { + // Stream is not added to closed stream list yet. + EXPECT_NE(session_->GetActiveStream(stream_->id()), nullptr); + } else { + EXPECT_EQ(session_->GetActiveStream(stream_->id()), nullptr); + } + })); + EXPECT_TRUE(stream_->OnStreamFrameAcked(18, 0, true, QuicTime::Delta::Zero(), + QuicTime::Zero(), + &newly_acked_length)); + EXPECT_FALSE(stream_->IsWaitingForAcks()); + EXPECT_EQ(0u, QuicStreamPeer::SendBuffer(stream_).size()); +} + TEST_P(QuicSpdyStreamTest, StreamDataGetAckedMultipleTimes) { Initialize(kShouldProcessData); quiche::QuicheReferenceCountedPointer<MockAckListener> mock_ack_listener(
diff --git a/quiche/quic/core/quic_stream.cc b/quiche/quic/core/quic_stream.cc index f42faa1..dbce695 100644 --- a/quiche/quic/core/quic_stream.cc +++ b/quiche/quic/core/quic_stream.cc
@@ -1132,8 +1132,8 @@ bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset, QuicByteCount data_length, bool fin_acked, - QuicTime::Delta /*ack_delay_time*/, - QuicTime /*receive_timestamp*/, + QuicTime::Delta ack_delay_time, + QuicTime receive_timestamp, QuicByteCount* newly_acked_length) { QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking " << "[" << offset << ", " << offset + data_length << "]" @@ -1160,12 +1160,25 @@ OnWriteSideInDataRecvdState(); write_side_data_recvd_state_notified_ = true; } + if (notify_ack_listener_earlier_ && new_data_acked) { + QUIC_RELOADABLE_FLAG_COUNT_N(quic_notify_ack_listener_earlier, 1, 3); + OnNewDataAcked(offset, data_length, *newly_acked_length, receive_timestamp, + ack_delay_time); + } if (!IsWaitingForAcks() && read_side_closed_ && write_side_closed_) { session_->MaybeCloseZombieStream(id_); } return new_data_acked; } +void QuicStream::OnNewDataAcked(QuicStreamOffset /*offset*/, + QuicByteCount /*data_length*/, + QuicByteCount /*newly_acked_length*/, + QuicTime /*receive_timestamp*/, + QuicTime::Delta /*ack_delay_time*/) { + QUICHE_DCHECK(notify_ack_listener_earlier_); +} + void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset, QuicByteCount data_length, bool fin_retransmitted) {
diff --git a/quiche/quic/core/quic_stream.h b/quiche/quic/core/quic_stream.h index b2b5e9c..4da8b7f 100644 --- a/quiche/quic/core/quic_stream.h +++ b/quiche/quic/core/quic_stream.h
@@ -484,6 +484,13 @@ // Does not send a FIN. May cause the stream to be closed. virtual void CloseWriteSide(); + // Called when any new data is acked. + virtual void OnNewDataAcked(QuicStreamOffset offset, + QuicByteCount data_length, + QuicByteCount newly_acked_length, + QuicTime receive_timestamp, + QuicTime::Delta ack_delay_time); + void set_rst_received(bool rst_received) { rst_received_ = rst_received; } void set_stream_error(QuicResetStreamError error) { stream_error_ = error; } @@ -510,6 +517,10 @@ std::optional<QuicByteCount> GetSendWindow() const; std::optional<QuicByteCount> GetReceiveWindow() const; + bool notify_ack_listener_earlier() const { + return notify_ack_listener_earlier_; + } + private: friend class test::QuicStreamPeer; friend class QuicStreamUtils; @@ -643,6 +654,9 @@ std::optional<QuicResetStreamAtFrame> buffered_reset_stream_at_; Perspective perspective_; + + const bool notify_ack_listener_earlier_ = + GetQuicReloadableFlag(quic_notify_ack_listener_earlier); }; } // namespace quic