Instead of immediate data flush, always try to bundle QPACK decoder data opportunistically. This change is based on Bence's cl/553795731. Protected by FLAGS_quic_restart_flag_quic_opport_bundle_qpack_decoder_data. PiperOrigin-RevId: 557313748
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc index 8f0e186..9644650 100644 --- a/quiche/quic/core/http/end_to_end_test.cc +++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -2856,9 +2856,15 @@ client_connection->GetStats().packets_sent; if (version_.UsesHttp3()) { - // Make sure 2 packets were sent, one for QPACK instructions, another for - // RESET_STREAM and STOP_SENDING. - EXPECT_EQ(packets_sent_before + 2, packets_sent_now); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // QPACK decoder instructions and RESET_STREAM and STOP_SENDING frames are + // sent in a single packet. + EXPECT_EQ(packets_sent_before + 1, packets_sent_now); + } else { + // Make sure 2 packets were sent, one for QPACK instructions, another for + // RESET_STREAM and STOP_SENDING. + EXPECT_EQ(packets_sent_before + 2, packets_sent_now); + } } // WaitForEvents waits 50ms and returns true if there are outstanding @@ -3144,7 +3150,11 @@ // received by the server. QuicConnection* server_connection = GetServerConnection(); EXPECT_FALSE(server_connection->HasPendingPathValidation()); - EXPECT_EQ(3u, server_connection->GetStats().num_validated_peer_migration); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_EQ(4u, server_connection->GetStats().num_validated_peer_migration); + } else { + EXPECT_EQ(3u, server_connection->GetStats().num_validated_peer_migration); + } EXPECT_EQ(server_cid3, server_connection->connection_id()); EXPECT_EQ(client_cid3, server_connection->client_connection_id()); EXPECT_TRUE(QuicConnectionPeer::GetServerConnectionIdOnAlternativePath(
diff --git a/quiche/quic/core/http/quic_spdy_session.cc b/quiche/quic/core/http/quic_spdy_session.cc index ee6d6d1..0e92248 100644 --- a/quiche/quic/core/http/quic_spdy_session.cc +++ b/quiche/quic/core/http/quic_spdy_session.cc
@@ -24,6 +24,7 @@ #include "quiche/quic/core/http/quic_spdy_stream.h" #include "quiche/quic/core/http/web_transport_http3.h" #include "quiche/quic/core/quic_error_codes.h" +#include "quiche/quic/core/quic_session.h" #include "quiche/quic/core/quic_types.h" #include "quiche/quic/core/quic_utils.h" #include "quiche/quic/core/quic_versions.h" @@ -857,6 +858,16 @@ send_control_stream_->MaybeSendSettingsFrame(); } +bool QuicSpdySession::CheckStreamWriteBlocked(QuicStream* stream) const { + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data) && + qpack_decoder_send_stream_ != nullptr && + stream->id() == qpack_decoder_send_stream_->id()) { + // Decoder data is always bundled opportunistically. + return true; + } + return QuicSession::CheckStreamWriteBlocked(stream); +} + QpackEncoder* QuicSpdySession::qpack_encoder() { QUICHE_DCHECK(VersionUsesHttp3(transport_version())); @@ -1637,6 +1648,12 @@ last_sent_http3_goaway_id_ = stream_id; } +void QuicSpdySession::MaybeBundleOpportunistically() { + if (qpack_decoder_ != nullptr) { + qpack_decoder_->FlushDecoderStream(); + } +} + void QuicSpdySession::OnCanCreateNewOutgoingStream(bool unidirectional) { if (unidirectional && VersionUsesHttp3(transport_version())) { MaybeInitializeHttp3UnidirectionalStreams();
diff --git a/quiche/quic/core/http/quic_spdy_session.h b/quiche/quic/core/http/quic_spdy_session.h index 98e4aba..ddd0e5f 100644 --- a/quiche/quic/core/http/quic_spdy_session.h +++ b/quiche/quic/core/http/quic_spdy_session.h
@@ -541,8 +541,9 @@ // Initializes HTTP/3 unidirectional streams if not yet initialzed. virtual void MaybeInitializeHttp3UnidirectionalStreams(); - // QuicConnectionVisitorInterface method. + // QuicConnectionVisitorInterface methods. void BeforeConnectionCloseSent() override; + void MaybeBundleOpportunistically() override; // Called whenever a datagram is dequeued or dropped from datagram_queue(). virtual void OnDatagramProcessed(absl::optional<MessageStatus> status); @@ -557,6 +558,10 @@ // available. void SendInitialData(); + // Override to skip checking for qpack_decoder_send_stream_ given decoder data + // is always bundled opportunistically. + bool CheckStreamWriteBlocked(QuicStream* stream) const override; + private: friend class test::QuicSpdySessionPeer;
diff --git a/quiche/quic/core/http/quic_spdy_session_test.cc b/quiche/quic/core/http/quic_spdy_session_test.cc index 749ac81..0ccb217 100644 --- a/quiche/quic/core/http/quic_spdy_session_test.cc +++ b/quiche/quic/core/http/quic_spdy_session_test.cc
@@ -1279,7 +1279,8 @@ // In HTTP/3, Qpack stream will send data on stream reset and cause packet to // be flushed. - if (VersionUsesHttp3(transport_version())) { + if (VersionUsesHttp3(transport_version()) && + !GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { EXPECT_CALL(*writer_, WritePacket(_, _, _, _, _, _)) .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0))); } @@ -1514,7 +1515,7 @@ // the STOP_SENDING, so set up the EXPECT there. EXPECT_CALL(*connection_, OnStreamReset(stream->id(), _)); EXPECT_CALL(*connection_, SendControlFrame(_)); - } else { + } else if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { EXPECT_CALL(*writer_, WritePacket(_, _, _, _, _, _)) .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0))); }
diff --git a/quiche/quic/core/http/quic_spdy_stream_test.cc b/quiche/quic/core/http/quic_spdy_stream_test.cc index c9e4704..5dd7e89 100644 --- a/quiche/quic/core/http/quic_spdy_stream_test.cc +++ b/quiche/quic/core/http/quic_spdy_stream_test.cc
@@ -586,12 +586,14 @@ stream_->id(), QuicResetStreamError::FromInternal(QUIC_HEADERS_TOO_LARGE), 0)); - auto qpack_decoder_stream = - QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - // Stream type and stream cancellation. - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _)) - .Times(2); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + auto qpack_decoder_stream = + QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); + // Stream type and stream cancellation. + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _)) + .Times(2); + } stream_->OnStreamFrame(frame); EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_HEADERS_TOO_LARGE)); @@ -2250,14 +2252,16 @@ std::string headers = HeadersFrame(encoded_headers); EXPECT_CALL(debug_visitor, OnHeadersFrameReceived(stream_->id(), encoded_headers.length())); - // Decoder stream type. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - // Header acknowledgement. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Decoder stream type. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + // Header acknowledgement. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _)); stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers)); @@ -2285,7 +2289,10 @@ EXPECT_CALL(debug_visitor, OnHeadersFrameReceived(stream_->id(), encoded_trailers.length())); // Header acknowledgement. - EXPECT_CALL(*session_, WritevData(decoder_send_stream->id(), _, _, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), _, _, _, _, _)); + } EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _)); stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */ headers.length() + data.length(), @@ -2324,14 +2331,16 @@ auto decoder_send_stream = QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - // Decoder stream type. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - // Header acknowledgement. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Decoder stream type. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + // Header acknowledgement. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _)); // Deliver dynamic table entry to decoder. session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar"); @@ -2361,8 +2370,11 @@ // Decoding is blocked because dynamic table entry has not been received yet. EXPECT_FALSE(stream_->trailers_decompressed()); - // Header acknowledgement. - EXPECT_CALL(*session_, WritevData(decoder_send_stream->id(), _, _, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Header acknowledgement. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), _, _, _, _, _)); + } EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _)); // Deliver second dynamic table entry to decoder. session_->qpack_decoder()->OnInsertWithoutNameReference("trailing", "foobar"); @@ -2453,14 +2465,16 @@ auto decoder_send_stream = QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - // Decoder stream type. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - // Header acknowledgement. - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Decoder stream type. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + // Header acknowledgement. + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } // Deliver dynamic table entry to decoder. session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar"); EXPECT_TRUE(stream_->headers_decompressed()); @@ -2522,15 +2536,17 @@ // Decoding is blocked because dynamic table entry has not been received yet. EXPECT_FALSE(stream_->headers_decompressed()); - // Decoder stream type and stream cancellation instruction. - auto decoder_send_stream = - QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Decoder stream type and stream cancellation instruction. + auto decoder_send_stream = + QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } // Reset stream by this endpoint, for example, due to stream cancellation. EXPECT_CALL(*session_, MaybeSendStopSendingFrame( @@ -2570,15 +2586,17 @@ // Decoding is blocked because dynamic table entry has not been received yet. EXPECT_FALSE(stream_->headers_decompressed()); - // Decoder stream type and stream cancellation instruction. - auto decoder_send_stream = - QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - EXPECT_CALL(*session_, - WritevData(decoder_send_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Decoder stream type and stream cancellation instruction. + auto decoder_send_stream = + QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + EXPECT_CALL(*session_, + WritevData(decoder_send_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } // OnStreamReset() is called when RESET_STREAM frame is received from peer. // This aborts header decompression. @@ -2961,14 +2979,16 @@ auto qpack_decoder_stream = QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - // Stream type. - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - // Stream cancellation. - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Stream type. + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + // Stream cancellation. + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } EXPECT_CALL(*session_, MaybeSendStopSendingFrame( stream_->id(), QuicResetStreamError::FromInternal( QUIC_STREAM_CANCELLED))); @@ -2992,14 +3012,16 @@ auto qpack_decoder_stream = QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - // Stream type. - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, - /* offset = */ 0, _, _, _)); - // Stream cancellation. - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, - /* offset = */ 1, _, _, _)); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // Stream type. + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, + /* offset = */ 0, _, _, _)); + // Stream cancellation. + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), /* write_length = */ 1, + /* offset = */ 1, _, _, _)); + } stream_->OnStreamReset(QuicRstStreamFrame( kInvalidControlFrameId, stream_->id(), QUIC_STREAM_CANCELLED, 0)); @@ -3382,13 +3404,15 @@ QuicStreamFrame frame(stream_->id(), /* fin = */ false, 0, data_frame); stream_->OnStreamFrame(frame); - // As a result of resetting the stream, stream type and stream cancellation - // are sent on the QPACK decoder stream. - auto qpack_decoder_stream = - QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); - EXPECT_CALL(*session_, - WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _)) - .Times(2); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + // As a result of resetting the stream, stream type and stream cancellation + // are sent on the QPACK decoder stream. + auto qpack_decoder_stream = + QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get()); + EXPECT_CALL(*session_, + WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _)) + .Times(2); + } stream_->OnStreamReset(QuicRstStreamFrame( kInvalidControlFrameId, stream_->id(), QUIC_STREAM_NO_ERROR, 0));
diff --git a/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc b/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc index e4847c1..86a8564 100644 --- a/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc +++ b/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
@@ -175,6 +175,9 @@ EXPECT_CALL(visitor_, OnHeadersDecoded(_, true)); qpack_decoder_.OnInsertWithoutNameReference("foo", "bar"); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + qpack_decoder_.FlushDecoderStream(); + } } TEST_F(QpackDecodedHeadersAccumulatorTest, BlockedDecoding) { @@ -198,6 +201,9 @@ EXPECT_EQ(strlen("foo") + strlen("bar"), header_list.uncompressed_header_bytes()); EXPECT_EQ(encoded_data.size(), header_list.compressed_header_bytes()); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + qpack_decoder_.FlushDecoderStream(); + } } TEST_F(QpackDecodedHeadersAccumulatorTest, @@ -221,6 +227,9 @@ accumulator_.EndHeaderBlock(); EXPECT_THAT(header_list, ElementsAre(Pair("foo", "bar"), Pair("foo", "bar"))); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + qpack_decoder_.FlushDecoderStream(); + } } // Regression test for https://crbug.com/1024263.
diff --git a/quiche/quic/core/qpack/qpack_decoder.cc b/quiche/quic/core/qpack/qpack_decoder.cc index 0569850..7a798ee 100644 --- a/quiche/quic/core/qpack/qpack_decoder.cc +++ b/quiche/quic/core/qpack/qpack_decoder.cc
@@ -31,7 +31,9 @@ void QpackDecoder::OnStreamReset(QuicStreamId stream_id) { if (header_table_.maximum_dynamic_table_capacity() > 0) { decoder_stream_sender_.SendStreamCancellation(stream_id); - decoder_stream_sender_.Flush(); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + decoder_stream_sender_.Flush(); + } } } @@ -66,7 +68,9 @@ known_received_count_ = header_table_.inserted_entry_count(); } - decoder_stream_sender_.Flush(); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + decoder_stream_sender_.Flush(); + } } void QpackDecoder::OnInsertWithNameReference(bool is_static, @@ -167,4 +171,6 @@ &header_table_, handler); } +void QpackDecoder::FlushDecoderStream() { decoder_stream_sender_.Flush(); } + } // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_decoder.h b/quiche/quic/core/qpack/qpack_decoder.h index 9474378..1d12068 100644 --- a/quiche/quic/core/qpack/qpack_decoder.h +++ b/quiche/quic/core/qpack/qpack_decoder.h
@@ -106,6 +106,9 @@ return header_table_.dynamic_table_entry_referenced(); } + // Flush buffered data on the decoder stream. + void FlushDecoderStream(); + private: EncoderStreamErrorDelegate* const encoder_stream_error_delegate_; QpackEncoderStreamReceiver encoder_stream_receiver_;
diff --git a/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc b/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc index cc48587..879f21b 100644 --- a/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc +++ b/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc
@@ -7,9 +7,11 @@ #include <cstddef> #include <limits> #include <string> +#include <utility> #include "absl/strings/string_view.h" #include "quiche/quic/core/qpack/qpack_instructions.h" +#include "quiche/quic/platform/api/quic_flag_utils.h" #include "quiche/quic/platform/api/quic_logging.h" namespace quic { @@ -33,10 +35,18 @@ } void QpackDecoderStreamSender::Flush() { - if (buffer_.empty()) { + if (buffer_.empty() || delegate_ == nullptr) { return; } - + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 3, 3); + // Swap buffer_ before calling WriteStreamData, which might result in a + // reentrant call to `Flush()`. + std::string copy; + std::swap(copy, buffer_); + delegate_->WriteStreamData(copy); + return; + } delegate_->WriteStreamData(buffer_); buffer_.clear(); }
diff --git a/quiche/quic/core/qpack/qpack_decoder_test.cc b/quiche/quic/core/qpack/qpack_decoder_test.cc index 5cfd960..4415871 100644 --- a/quiche/quic/core/qpack/qpack_decoder_test.cc +++ b/quiche/quic/core/qpack/qpack_decoder_test.cc
@@ -63,6 +63,8 @@ return qpack_decoder_.CreateProgressiveDecoder(stream_id, &handler_); } + void FlushDecoderStream() { qpack_decoder_.FlushDecoderStream(); } + // Set up |progressive_decoder_|. void StartDecoding() { progressive_decoder_ = CreateProgressiveDecoder(/* stream_id = */ 1); @@ -367,10 +369,17 @@ .InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s); - EXPECT_CALL(decoder_stream_sender_delegate_, - WriteStreamData(Eq(kHeaderAcknowledgement))) - .InSequence(s); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } DecodeHeaderBlock(absl::HexStringToBytes( "0500" // Required Insert Count 4 and Delta Base 0. @@ -381,6 +390,9 @@ "80" // Dynamic table entry with relative index 0, absolute index 3. "41025a5a")); // Name of entry 1 (relative index) from dynamic table, // with value "ZZ". + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("bar"))).InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s); @@ -388,10 +400,17 @@ .InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s); - EXPECT_CALL(decoder_stream_sender_delegate_, - WriteStreamData(Eq(kHeaderAcknowledgement))) - .InSequence(s); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } DecodeHeaderBlock(absl::HexStringToBytes( "0502" // Required Insert Count 4 and Delta Base 2. @@ -402,6 +421,9 @@ "82" // Dynamic table entry with relative index 2, absolute index 3. "43025a5a")); // Name of entry 3 (relative index) from dynamic table, // with value "ZZ". + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("bar"))).InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s); @@ -409,10 +431,17 @@ .InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s); EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s); - EXPECT_CALL(decoder_stream_sender_delegate_, - WriteStreamData(Eq(kHeaderAcknowledgement))) - .InSequence(s); + if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(decoder_stream_sender_delegate_, + WriteStreamData(Eq(kHeaderAcknowledgement))) + .InSequence(s); + } DecodeHeaderBlock(absl::HexStringToBytes( "0582" // Required Insert Count 4 and Delta Base 2 with sign bit set. @@ -423,6 +452,9 @@ "12" // Dynamic table entry with post-base index 2, absolute index 3. "01025a5a")); // Name of entry 1 (post-base index) from dynamic table, // with value "ZZ". + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } TEST_P(QpackDecoderTest, DecreasingDynamicTableCapacityEvictsEntries) { @@ -453,6 +485,9 @@ "0200" // Required Insert Count 1 and Delta Base 0. // Base is 1 + 0 = 1. "80")); // Dynamic table entry with relative index 0, absolute index 0. + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } TEST_P(QpackDecoderTest, EncoderStreamErrorEntryTooLarge) { @@ -683,6 +718,9 @@ "0a00" // Encoded Required Insert Count 10, Required Insert Count 201, // Delta Base 0, Base 201. "80")); // Emit dynamic table entry with relative index 0. + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } TEST_P(QpackDecoderTest, NonZeroRequiredInsertCountButNoDynamicEntries) { @@ -842,6 +880,9 @@ DecodeEncoderStreamData(absl::HexStringToBytes("3fe107")); // Add literal entry with name "foo" and value "bar". DecodeEncoderStreamData(absl::HexStringToBytes("6294e703626172")); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } TEST_P(QpackDecoderTest, BlockedDecodingUnblockedBeforeEndOfHeaderBlock) { @@ -879,6 +920,9 @@ EXPECT_CALL(decoder_stream_sender_delegate_, WriteStreamData(Eq(kHeaderAcknowledgement))); EndDecoding(); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } // Regression test for https://crbug.com/1024263. @@ -932,6 +976,9 @@ // Add literal entry with name "foo" and value "bar". // Insert Count is now 6, reaching Required Insert Count of the header block. DecodeEncoderStreamData(absl::HexStringToBytes("6294e70362617a")); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } TEST_P(QpackDecoderTest, TooManyBlockedStreams) { @@ -972,6 +1019,9 @@ "0200" // Required Insert Count 1 and Delta Base 0. // Base is 1 + 0 = 1. "80")); // Dynamic table entry with relative index 0, absolute index 0. + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + FlushDecoderStream(); + } } } // namespace
diff --git a/quiche/quic/core/quic_connection.cc b/quiche/quic/core/quic_connection.cc index 11d59d1..857cf47 100644 --- a/quiche/quic/core/quic_connection.cc +++ b/quiche/quic/core/quic_connection.cc
@@ -3171,7 +3171,7 @@ return connected_ && !HandleWriteBlocked(); } -const QuicFrames QuicConnection::MaybeBundleAckOpportunistically() { +const QuicFrames QuicConnection::MaybeBundleOpportunistically() { if (!ack_frequency_sent_ && sent_packet_manager_.CanSendAckFrequency()) { if (packet_creator_.NextSendingPacketNumber() >= FirstSendingPacketNumber() + kMinReceivedBeforeAckDecimation) { @@ -3182,6 +3182,11 @@ } } + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 1, 3); + visitor_->MaybeBundleOpportunistically(); + } + QuicFrames frames; const bool has_pending_ack = uber_received_packet_manager_ @@ -3200,6 +3205,7 @@ << encryption_level_ << " ACK, " << (has_pending_ack ? "" : "!") << "has_pending_ack"; frames.push_back(updated_ack_frame); + // TODO(fayang): remove return value by FlushAckFrame here. return frames; } @@ -5786,6 +5792,10 @@ uber_received_packet_manager_.GetEarliestAckTimeout(); QUIC_BUG_IF(quic_bug_12714_32, !earliest_ack_timeout.IsInitialized()); MaybeBundleCryptoDataWithAcks(); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 2, 3); + visitor_->MaybeBundleOpportunistically(); + } earliest_ack_timeout = uber_received_packet_manager_.GetEarliestAckTimeout(); if (!earliest_ack_timeout.IsInitialized()) { return;
diff --git a/quiche/quic/core/quic_connection.h b/quiche/quic/core/quic_connection.h index 04c02e5..01b9049 100644 --- a/quiche/quic/core/quic_connection.h +++ b/quiche/quic/core/quic_connection.h
@@ -263,6 +263,9 @@ // Called when the client receives a preferred address from its peer. virtual void OnServerPreferredAddressAvailable( const QuicSocketAddress& server_preferred_address) = 0; + + // Asks session to bundle data opportunistically with outgoing data. + virtual void MaybeBundleOpportunistically() = 0; }; // Interface which gets callbacks from the QuicConnection at interesting @@ -727,7 +730,7 @@ // QuicPacketCreator::DelegateInterface bool ShouldGeneratePacket(HasRetransmittableData retransmittable, IsHandshake handshake) override; - const QuicFrames MaybeBundleAckOpportunistically() override; + const QuicFrames MaybeBundleOpportunistically() override; QuicPacketBuffer GetPacketBuffer() override; void OnSerializedPacket(SerializedPacket packet) override; void OnUnrecoverableError(QuicErrorCode error,
diff --git a/quiche/quic/core/quic_connection_test.cc b/quiche/quic/core/quic_connection_test.cc index daa49b3..ccb65bf 100644 --- a/quiche/quic/core/quic_connection_test.cc +++ b/quiche/quic/core/quic_connection_test.cc
@@ -700,6 +700,9 @@ EXPECT_CALL(visitor_, OnCongestionWindowChange(_)).Times(AnyNumber()); EXPECT_CALL(visitor_, OnPacketReceived(_, _, _)).Times(AnyNumber()); EXPECT_CALL(visitor_, OnSuccessfulVersionNegotiation(_)).Times(AnyNumber()); + if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) { + EXPECT_CALL(visitor_, MaybeBundleOpportunistically()).Times(AnyNumber()); + } EXPECT_CALL(visitor_, OnOneRttPacketAcknowledged()) .Times(testing::AtMost(1)); EXPECT_CALL(*loss_algorithm_, GetLossTimeout())
diff --git a/quiche/quic/core/quic_dispatcher.cc b/quiche/quic/core/quic_dispatcher.cc index 6f4c227..9fb397e 100644 --- a/quiche/quic/core/quic_dispatcher.cc +++ b/quiche/quic/core/quic_dispatcher.cc
@@ -105,7 +105,7 @@ return true; } - const QuicFrames MaybeBundleAckOpportunistically() override { + const QuicFrames MaybeBundleOpportunistically() override { QUICHE_DCHECK(false); return {}; }
diff --git a/quiche/quic/core/quic_flags_list.h b/quiche/quic/core/quic_flags_list.h index ad903a8..6744b55 100644 --- a/quiche/quic/core/quic_flags_list.h +++ b/quiche/quic/core/quic_flags_list.h
@@ -35,6 +35,8 @@ QUIC_FLAG(quic_reloadable_flag_quic_can_send_ack_frequency, true) // If true, allow client to enable BBRv2 on server via connection option \'B2ON\'. QUIC_FLAG(quic_reloadable_flag_quic_allow_client_enabled_bbr_v2, true) +// If true, always bundle qpack decoder data with other frames opportunistically. +QUIC_FLAG(quic_restart_flag_quic_opport_bundle_qpack_decoder_data, false) // If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed. QUIC_FLAG(quic_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true) // If true, check connected at the beginning of TlsHandshaker::SetReadSecret.
diff --git a/quiche/quic/core/quic_packet_creator.cc b/quiche/quic/core/quic_packet_creator.cc index f62f9d1..f0fa133 100644 --- a/quiche/quic/core/quic_packet_creator.cc +++ b/quiche/quic/core/quic_packet_creator.cc
@@ -1282,7 +1282,7 @@ << "Adding a control frame with no control frame id: " << frame; QUICHE_DCHECK(QuicUtils::IsRetransmittableFrame(frame.type)) << ENDPOINT << frame; - MaybeBundleAckOpportunistically(); + MaybeBundleOpportunistically(); if (HasPendingFrames()) { if (AddFrame(frame, next_transmission_type_)) { // There is pending frames and current frame fits. @@ -1312,7 +1312,7 @@ << "Packet flusher is not attached when " "generator tries to write stream data."; bool has_handshake = QuicUtils::IsCryptoStreamId(transport_version(), id); - MaybeBundleAckOpportunistically(); + MaybeBundleOpportunistically(); bool fin = state != NO_FIN; QUIC_BUG_IF(quic_bug_12398_17, has_handshake && fin) << ENDPOINT << "Handshake packets should never send a fin"; @@ -1439,7 +1439,7 @@ << ENDPOINT << "Packet flusher is not attached when " "generator tries to write crypto data."; - MaybeBundleAckOpportunistically(); + MaybeBundleOpportunistically(); // To make reasoning about crypto frames easier, we don't combine them with // other retransmittable frames in a single packet. // TODO(nharper): Once we have separate packet number spaces, everything @@ -1511,7 +1511,7 @@ SetMaxPacketLength(current_mtu); } -void QuicPacketCreator::MaybeBundleAckOpportunistically() { +void QuicPacketCreator::MaybeBundleOpportunistically() { if (has_ack()) { // Ack already queued, nothing to do. return; @@ -1520,8 +1520,7 @@ NOT_HANDSHAKE)) { return; } - const bool flushed = - FlushAckFrame(delegate_->MaybeBundleAckOpportunistically()); + const bool flushed = FlushAckFrame(delegate_->MaybeBundleOpportunistically()); QUIC_BUG_IF(quic_bug_10752_29, !flushed) << ENDPOINT << "Failed to flush ACK frame. encryption_level:" << packet_.encryption_level; @@ -1532,8 +1531,8 @@ << ENDPOINT << "Packet flusher is not attached when " "generator tries to send ACK frame."; - // MaybeBundleAckOpportunistically could be called nestedly when sending a - // control frame causing another control frame to be sent. + // MaybeBundleOpportunistically could be called nestedly when + // sending a control frame causing another control frame to be sent. QUIC_BUG_IF(quic_bug_12398_18, !frames.empty() && has_ack()) << ENDPOINT << "Trying to flush " << quiche::PrintElements(frames) << " when there is ACK queued"; @@ -1615,7 +1614,7 @@ << ENDPOINT << "Packet flusher is not attached when " "generator tries to add message frame."; - MaybeBundleAckOpportunistically(); + MaybeBundleOpportunistically(); const QuicByteCount message_length = MemSliceSpanTotalSize(message); if (message_length > GetCurrentLargestMessagePayload()) { return MESSAGE_STATUS_TOO_LARGE;
diff --git a/quiche/quic/core/quic_packet_creator.h b/quiche/quic/core/quic_packet_creator.h index 7c53414..88f17d5 100644 --- a/quiche/quic/core/quic_packet_creator.h +++ b/quiche/quic/core/quic_packet_creator.h
@@ -60,9 +60,9 @@ // Consults delegate whether a packet should be generated. virtual bool ShouldGeneratePacket(HasRetransmittableData retransmittable, IsHandshake handshake) = 0; - // Called when there is data to be sent. Retrieves updated ACK frame from - // the delegate. - virtual const QuicFrames MaybeBundleAckOpportunistically() = 0; + // Called when there is data to be sent. Gives delegate a chance to bundle + // anything with to-be-sent data. + virtual const QuicFrames MaybeBundleOpportunistically() = 0; // Returns the packet fate for serialized packets which will be handed over // to delegate via OnSerializedPacket(). Called when a packet is about to be @@ -369,9 +369,9 @@ // Generates an MTU discovery packet of specified size. void GenerateMtuDiscoveryPacket(QuicByteCount target_mtu); - // Called when there is data to be sent, Retrieves updated ACK frame from - // delegate_ and flushes it. - void MaybeBundleAckOpportunistically(); + // Called when there is data to be sent. Gives delegate a chance to bundle any + // data (including ACK). + void MaybeBundleOpportunistically(); // Called to flush ACK and STOP_WAITING frames, returns false if the flush // fails.
diff --git a/quiche/quic/core/quic_packet_creator_test.cc b/quiche/quic/core/quic_packet_creator_test.cc index aa919db..30c8173 100644 --- a/quiche/quic/core/quic_packet_creator_test.cc +++ b/quiche/quic/core/quic_packet_creator_test.cc
@@ -2476,8 +2476,7 @@ MOCK_METHOD(bool, ShouldGeneratePacket, (HasRetransmittableData retransmittable, IsHandshake handshake), (override)); - MOCK_METHOD(const QuicFrames, MaybeBundleAckOpportunistically, (), - (override)); + MOCK_METHOD(const QuicFrames, MaybeBundleOpportunistically, (), (override)); MOCK_METHOD(QuicPacketBuffer, GetPacketBuffer, (), (override)); MOCK_METHOD(void, OnSerializedPacket, (SerializedPacket), (override)); MOCK_METHOD(void, OnUnrecoverableError, (QuicErrorCode, const std::string&), @@ -2556,7 +2555,7 @@ } if (delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA, NOT_HANDSHAKE)) { - EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()) + EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()) .WillOnce(Return(frames)); } } @@ -2583,7 +2582,7 @@ } if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA, NOT_HANDSHAKE)) { - EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1); + EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1); } return QuicPacketCreator::ConsumeData(id, data.length(), offset, state); } @@ -2592,7 +2591,7 @@ quiche::QuicheMemSlice message) { if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA, NOT_HANDSHAKE)) { - EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1); + EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1); } return QuicPacketCreator::AddMessageFrame(message_id, absl::MakeSpan(&message, 1)); @@ -2603,7 +2602,7 @@ producer_->SaveCryptoData(level, offset, data); if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA, NOT_HANDSHAKE)) { - EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1); + EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1); } return QuicPacketCreator::ConsumeCryptoData(level, data.length(), offset); }
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h index fe7e4bb..a518091 100644 --- a/quiche/quic/core/quic_session.h +++ b/quiche/quic/core/quic_session.h
@@ -189,6 +189,7 @@ std::unique_ptr<QuicPathValidationContext> /*context*/) override {} void OnServerPreferredAddressAvailable( const QuicSocketAddress& /*server_preferred_address*/) override; + void MaybeBundleOpportunistically() override {} // QuicStreamFrameDataProducer WriteStreamDataResult WriteStreamData(QuicStreamId id, @@ -841,6 +842,11 @@ return absl::nullopt; } + // Debug helper for OnCanWrite. Check that after QuicStream::OnCanWrite(), + // if stream has buffered data and is not stream level flow control blocked, + // it has to be in the write blocked list. + virtual bool CheckStreamWriteBlocked(QuicStream* stream) const; + private: friend class test::QuicSessionPeer; @@ -872,11 +878,6 @@ uint64_t previous_bytes_written, bool previous_fin_sent); - // Debug helper for OnCanWrite. Check that after QuicStream::OnCanWrite(), - // if stream has buffered data and is not stream level flow control blocked, - // it has to be in the write blocked list. - bool CheckStreamWriteBlocked(QuicStream* stream) const; - // Called in OnConfigNegotiated for Finch trials to measure performance of // starting with larger flow control receive windows. void AdjustInitialFlowControlWindows(size_t stream_window);
diff --git a/quiche/quic/test_tools/quic_test_utils.h b/quiche/quic/test_tools/quic_test_utils.h index 3d58222..9d7ad23 100644 --- a/quiche/quic/test_tools/quic_test_utils.h +++ b/quiche/quic/test_tools/quic_test_utils.h
@@ -507,6 +507,7 @@ (std::unique_ptr<QuicPathValidationContext>), (override)); MOCK_METHOD(void, OnServerPreferredAddressAvailable, (const QuicSocketAddress&), (override)); + MOCK_METHOD(void, MaybeBundleOpportunistically, (), (override)); void OnBandwidthUpdateTimeout() override {} }; @@ -1418,8 +1419,7 @@ MOCK_METHOD(bool, ShouldGeneratePacket, (HasRetransmittableData retransmittable, IsHandshake handshake), (override)); - MOCK_METHOD(const QuicFrames, MaybeBundleAckOpportunistically, (), - (override)); + MOCK_METHOD(const QuicFrames, MaybeBundleOpportunistically, (), (override)); MOCK_METHOD(SerializedPacketFate, GetSerializedPacketFate, (bool, EncryptionLevel), (override)); };
diff --git a/quiche/quic/test_tools/simulator/quic_endpoint.h b/quiche/quic/test_tools/simulator/quic_endpoint.h index 84f5ad9..c3c3a1c 100644 --- a/quiche/quic/test_tools/simulator/quic_endpoint.h +++ b/quiche/quic/test_tools/simulator/quic_endpoint.h
@@ -112,6 +112,7 @@ std::unique_ptr<QuicPathValidationContext> /*context*/) override {} void OnServerPreferredAddressAvailable( const QuicSocketAddress& /*server_preferred_address*/) override {} + void MaybeBundleOpportunistically() override {} // End QuicConnectionVisitorInterface implementation.