Instead of immediate data flush, always try to bundle QPACK decoder data opportunistically.

This change is based on Bence's cl/553795731.

Protected by FLAGS_quic_restart_flag_quic_opport_bundle_qpack_decoder_data.

PiperOrigin-RevId: 557313748
diff --git a/quiche/quic/core/http/end_to_end_test.cc b/quiche/quic/core/http/end_to_end_test.cc
index 8f0e186..9644650 100644
--- a/quiche/quic/core/http/end_to_end_test.cc
+++ b/quiche/quic/core/http/end_to_end_test.cc
@@ -2856,9 +2856,15 @@
       client_connection->GetStats().packets_sent;
 
   if (version_.UsesHttp3()) {
-    // Make sure 2 packets were sent, one for QPACK instructions, another for
-    // RESET_STREAM and STOP_SENDING.
-    EXPECT_EQ(packets_sent_before + 2, packets_sent_now);
+    if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+      // QPACK decoder instructions and RESET_STREAM and STOP_SENDING frames are
+      // sent in a single packet.
+      EXPECT_EQ(packets_sent_before + 1, packets_sent_now);
+    } else {
+      // Make sure 2 packets were sent, one for QPACK instructions, another for
+      // RESET_STREAM and STOP_SENDING.
+      EXPECT_EQ(packets_sent_before + 2, packets_sent_now);
+    }
   }
 
   // WaitForEvents waits 50ms and returns true if there are outstanding
@@ -3144,7 +3150,11 @@
   // received by the server.
   QuicConnection* server_connection = GetServerConnection();
   EXPECT_FALSE(server_connection->HasPendingPathValidation());
-  EXPECT_EQ(3u, server_connection->GetStats().num_validated_peer_migration);
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_EQ(4u, server_connection->GetStats().num_validated_peer_migration);
+  } else {
+    EXPECT_EQ(3u, server_connection->GetStats().num_validated_peer_migration);
+  }
   EXPECT_EQ(server_cid3, server_connection->connection_id());
   EXPECT_EQ(client_cid3, server_connection->client_connection_id());
   EXPECT_TRUE(QuicConnectionPeer::GetServerConnectionIdOnAlternativePath(
diff --git a/quiche/quic/core/http/quic_spdy_session.cc b/quiche/quic/core/http/quic_spdy_session.cc
index ee6d6d1..0e92248 100644
--- a/quiche/quic/core/http/quic_spdy_session.cc
+++ b/quiche/quic/core/http/quic_spdy_session.cc
@@ -24,6 +24,7 @@
 #include "quiche/quic/core/http/quic_spdy_stream.h"
 #include "quiche/quic/core/http/web_transport_http3.h"
 #include "quiche/quic/core/quic_error_codes.h"
+#include "quiche/quic/core/quic_session.h"
 #include "quiche/quic/core/quic_types.h"
 #include "quiche/quic/core/quic_utils.h"
 #include "quiche/quic/core/quic_versions.h"
@@ -857,6 +858,16 @@
   send_control_stream_->MaybeSendSettingsFrame();
 }
 
+bool QuicSpdySession::CheckStreamWriteBlocked(QuicStream* stream) const {
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data) &&
+      qpack_decoder_send_stream_ != nullptr &&
+      stream->id() == qpack_decoder_send_stream_->id()) {
+    // Decoder data is always bundled opportunistically.
+    return true;
+  }
+  return QuicSession::CheckStreamWriteBlocked(stream);
+}
+
 QpackEncoder* QuicSpdySession::qpack_encoder() {
   QUICHE_DCHECK(VersionUsesHttp3(transport_version()));
 
@@ -1637,6 +1648,12 @@
   last_sent_http3_goaway_id_ = stream_id;
 }
 
+void QuicSpdySession::MaybeBundleOpportunistically() {
+  if (qpack_decoder_ != nullptr) {
+    qpack_decoder_->FlushDecoderStream();
+  }
+}
+
 void QuicSpdySession::OnCanCreateNewOutgoingStream(bool unidirectional) {
   if (unidirectional && VersionUsesHttp3(transport_version())) {
     MaybeInitializeHttp3UnidirectionalStreams();
diff --git a/quiche/quic/core/http/quic_spdy_session.h b/quiche/quic/core/http/quic_spdy_session.h
index 98e4aba..ddd0e5f 100644
--- a/quiche/quic/core/http/quic_spdy_session.h
+++ b/quiche/quic/core/http/quic_spdy_session.h
@@ -541,8 +541,9 @@
   // Initializes HTTP/3 unidirectional streams if not yet initialzed.
   virtual void MaybeInitializeHttp3UnidirectionalStreams();
 
-  // QuicConnectionVisitorInterface method.
+  // QuicConnectionVisitorInterface methods.
   void BeforeConnectionCloseSent() override;
+  void MaybeBundleOpportunistically() override;
 
   // Called whenever a datagram is dequeued or dropped from datagram_queue().
   virtual void OnDatagramProcessed(absl::optional<MessageStatus> status);
@@ -557,6 +558,10 @@
   // available.
   void SendInitialData();
 
+  // Override to skip checking for qpack_decoder_send_stream_ given decoder data
+  // is always bundled opportunistically.
+  bool CheckStreamWriteBlocked(QuicStream* stream) const override;
+
  private:
   friend class test::QuicSpdySessionPeer;
 
diff --git a/quiche/quic/core/http/quic_spdy_session_test.cc b/quiche/quic/core/http/quic_spdy_session_test.cc
index 749ac81..0ccb217 100644
--- a/quiche/quic/core/http/quic_spdy_session_test.cc
+++ b/quiche/quic/core/http/quic_spdy_session_test.cc
@@ -1279,7 +1279,8 @@
 
   // In HTTP/3, Qpack stream will send data on stream reset and cause packet to
   // be flushed.
-  if (VersionUsesHttp3(transport_version())) {
+  if (VersionUsesHttp3(transport_version()) &&
+      !GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
     EXPECT_CALL(*writer_, WritePacket(_, _, _, _, _, _))
         .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
   }
@@ -1514,7 +1515,7 @@
     // the STOP_SENDING, so set up the EXPECT there.
     EXPECT_CALL(*connection_, OnStreamReset(stream->id(), _));
     EXPECT_CALL(*connection_, SendControlFrame(_));
-  } else {
+  } else if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
     EXPECT_CALL(*writer_, WritePacket(_, _, _, _, _, _))
         .WillOnce(Return(WriteResult(WRITE_STATUS_OK, 0)));
   }
diff --git a/quiche/quic/core/http/quic_spdy_stream_test.cc b/quiche/quic/core/http/quic_spdy_stream_test.cc
index c9e4704..5dd7e89 100644
--- a/quiche/quic/core/http/quic_spdy_stream_test.cc
+++ b/quiche/quic/core/http/quic_spdy_stream_test.cc
@@ -586,12 +586,14 @@
           stream_->id(),
           QuicResetStreamError::FromInternal(QUIC_HEADERS_TOO_LARGE), 0));
 
-  auto qpack_decoder_stream =
-      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  // Stream type and stream cancellation.
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _))
-      .Times(2);
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    auto qpack_decoder_stream =
+        QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+    // Stream type and stream cancellation.
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _))
+        .Times(2);
+  }
 
   stream_->OnStreamFrame(frame);
   EXPECT_THAT(stream_->stream_error(), IsStreamError(QUIC_HEADERS_TOO_LARGE));
@@ -2250,14 +2252,16 @@
   std::string headers = HeadersFrame(encoded_headers);
   EXPECT_CALL(debug_visitor,
               OnHeadersFrameReceived(stream_->id(), encoded_headers.length()));
-  // Decoder stream type.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  // Header acknowledgement.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Decoder stream type.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    // Header acknowledgement.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
   EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _));
   stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
 
@@ -2285,7 +2289,10 @@
   EXPECT_CALL(debug_visitor,
               OnHeadersFrameReceived(stream_->id(), encoded_trailers.length()));
   // Header acknowledgement.
-  EXPECT_CALL(*session_, WritevData(decoder_send_stream->id(), _, _, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), _, _, _, _, _));
+  }
   EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _));
   stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */
                                          headers.length() + data.length(),
@@ -2324,14 +2331,16 @@
   auto decoder_send_stream =
       QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
 
-  // Decoder stream type.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  // Header acknowledgement.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Decoder stream type.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    // Header acknowledgement.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
   EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _));
   // Deliver dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar");
@@ -2361,8 +2370,11 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->trailers_decompressed());
 
-  // Header acknowledgement.
-  EXPECT_CALL(*session_, WritevData(decoder_send_stream->id(), _, _, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Header acknowledgement.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), _, _, _, _, _));
+  }
   EXPECT_CALL(debug_visitor, OnHeadersDecoded(stream_->id(), _));
   // Deliver second dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("trailing", "foobar");
@@ -2453,14 +2465,16 @@
   auto decoder_send_stream =
       QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
 
-  // Decoder stream type.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  // Header acknowledgement.
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Decoder stream type.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    // Header acknowledgement.
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
   // Deliver dynamic table entry to decoder.
   session_->qpack_decoder()->OnInsertWithoutNameReference("foo", "bar");
   EXPECT_TRUE(stream_->headers_decompressed());
@@ -2522,15 +2536,17 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->headers_decompressed());
 
-  // Decoder stream type and stream cancellation instruction.
-  auto decoder_send_stream =
-      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Decoder stream type and stream cancellation instruction.
+    auto decoder_send_stream =
+        QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
 
   // Reset stream by this endpoint, for example, due to stream cancellation.
   EXPECT_CALL(*session_, MaybeSendStopSendingFrame(
@@ -2570,15 +2586,17 @@
   // Decoding is blocked because dynamic table entry has not been received yet.
   EXPECT_FALSE(stream_->headers_decompressed());
 
-  // Decoder stream type and stream cancellation instruction.
-  auto decoder_send_stream =
-      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  EXPECT_CALL(*session_,
-              WritevData(decoder_send_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Decoder stream type and stream cancellation instruction.
+    auto decoder_send_stream =
+        QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    EXPECT_CALL(*session_,
+                WritevData(decoder_send_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
 
   // OnStreamReset() is called when RESET_STREAM frame is received from peer.
   // This aborts header decompression.
@@ -2961,14 +2979,16 @@
 
   auto qpack_decoder_stream =
       QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  // Stream type.
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  // Stream cancellation.
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Stream type.
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    // Stream cancellation.
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
   EXPECT_CALL(*session_, MaybeSendStopSendingFrame(
                              stream_->id(), QuicResetStreamError::FromInternal(
                                                 QUIC_STREAM_CANCELLED)));
@@ -2992,14 +3012,16 @@
 
   auto qpack_decoder_stream =
       QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  // Stream type.
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 0, _, _, _));
-  // Stream cancellation.
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
-                         /* offset = */ 1, _, _, _));
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // Stream type.
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 0, _, _, _));
+    // Stream cancellation.
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), /* write_length = */ 1,
+                           /* offset = */ 1, _, _, _));
+  }
 
   stream_->OnStreamReset(QuicRstStreamFrame(
       kInvalidControlFrameId, stream_->id(), QUIC_STREAM_CANCELLED, 0));
@@ -3382,13 +3404,15 @@
   QuicStreamFrame frame(stream_->id(), /* fin = */ false, 0, data_frame);
   stream_->OnStreamFrame(frame);
 
-  // As a result of resetting the stream, stream type and stream cancellation
-  // are sent on the QPACK decoder stream.
-  auto qpack_decoder_stream =
-      QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
-  EXPECT_CALL(*session_,
-              WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _))
-      .Times(2);
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    // As a result of resetting the stream, stream type and stream cancellation
+    // are sent on the QPACK decoder stream.
+    auto qpack_decoder_stream =
+        QuicSpdySessionPeer::GetQpackDecoderSendStream(session_.get());
+    EXPECT_CALL(*session_,
+                WritevData(qpack_decoder_stream->id(), _, _, NO_FIN, _, _))
+        .Times(2);
+  }
 
   stream_->OnStreamReset(QuicRstStreamFrame(
       kInvalidControlFrameId, stream_->id(), QUIC_STREAM_NO_ERROR, 0));
diff --git a/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc b/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
index e4847c1..86a8564 100644
--- a/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
+++ b/quiche/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
@@ -175,6 +175,9 @@
 
   EXPECT_CALL(visitor_, OnHeadersDecoded(_, true));
   qpack_decoder_.OnInsertWithoutNameReference("foo", "bar");
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    qpack_decoder_.FlushDecoderStream();
+  }
 }
 
 TEST_F(QpackDecodedHeadersAccumulatorTest, BlockedDecoding) {
@@ -198,6 +201,9 @@
   EXPECT_EQ(strlen("foo") + strlen("bar"),
             header_list.uncompressed_header_bytes());
   EXPECT_EQ(encoded_data.size(), header_list.compressed_header_bytes());
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    qpack_decoder_.FlushDecoderStream();
+  }
 }
 
 TEST_F(QpackDecodedHeadersAccumulatorTest,
@@ -221,6 +227,9 @@
   accumulator_.EndHeaderBlock();
 
   EXPECT_THAT(header_list, ElementsAre(Pair("foo", "bar"), Pair("foo", "bar")));
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    qpack_decoder_.FlushDecoderStream();
+  }
 }
 
 // Regression test for https://crbug.com/1024263.
diff --git a/quiche/quic/core/qpack/qpack_decoder.cc b/quiche/quic/core/qpack/qpack_decoder.cc
index 0569850..7a798ee 100644
--- a/quiche/quic/core/qpack/qpack_decoder.cc
+++ b/quiche/quic/core/qpack/qpack_decoder.cc
@@ -31,7 +31,9 @@
 void QpackDecoder::OnStreamReset(QuicStreamId stream_id) {
   if (header_table_.maximum_dynamic_table_capacity() > 0) {
     decoder_stream_sender_.SendStreamCancellation(stream_id);
-    decoder_stream_sender_.Flush();
+    if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+      decoder_stream_sender_.Flush();
+    }
   }
 }
 
@@ -66,7 +68,9 @@
     known_received_count_ = header_table_.inserted_entry_count();
   }
 
-  decoder_stream_sender_.Flush();
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    decoder_stream_sender_.Flush();
+  }
 }
 
 void QpackDecoder::OnInsertWithNameReference(bool is_static,
@@ -167,4 +171,6 @@
                                                    &header_table_, handler);
 }
 
+void QpackDecoder::FlushDecoderStream() { decoder_stream_sender_.Flush(); }
+
 }  // namespace quic
diff --git a/quiche/quic/core/qpack/qpack_decoder.h b/quiche/quic/core/qpack/qpack_decoder.h
index 9474378..1d12068 100644
--- a/quiche/quic/core/qpack/qpack_decoder.h
+++ b/quiche/quic/core/qpack/qpack_decoder.h
@@ -106,6 +106,9 @@
     return header_table_.dynamic_table_entry_referenced();
   }
 
+  // Flush buffered data on the decoder stream.
+  void FlushDecoderStream();
+
  private:
   EncoderStreamErrorDelegate* const encoder_stream_error_delegate_;
   QpackEncoderStreamReceiver encoder_stream_receiver_;
diff --git a/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc b/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc
index cc48587..879f21b 100644
--- a/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc
+++ b/quiche/quic/core/qpack/qpack_decoder_stream_sender.cc
@@ -7,9 +7,11 @@
 #include <cstddef>
 #include <limits>
 #include <string>
+#include <utility>
 
 #include "absl/strings/string_view.h"
 #include "quiche/quic/core/qpack/qpack_instructions.h"
+#include "quiche/quic/platform/api/quic_flag_utils.h"
 #include "quiche/quic/platform/api/quic_logging.h"
 
 namespace quic {
@@ -33,10 +35,18 @@
 }
 
 void QpackDecoderStreamSender::Flush() {
-  if (buffer_.empty()) {
+  if (buffer_.empty() || delegate_ == nullptr) {
     return;
   }
-
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 3, 3);
+    // Swap buffer_ before calling WriteStreamData, which might result in a
+    // reentrant call to `Flush()`.
+    std::string copy;
+    std::swap(copy, buffer_);
+    delegate_->WriteStreamData(copy);
+    return;
+  }
   delegate_->WriteStreamData(buffer_);
   buffer_.clear();
 }
diff --git a/quiche/quic/core/qpack/qpack_decoder_test.cc b/quiche/quic/core/qpack/qpack_decoder_test.cc
index 5cfd960..4415871 100644
--- a/quiche/quic/core/qpack/qpack_decoder_test.cc
+++ b/quiche/quic/core/qpack/qpack_decoder_test.cc
@@ -63,6 +63,8 @@
     return qpack_decoder_.CreateProgressiveDecoder(stream_id, &handler_);
   }
 
+  void FlushDecoderStream() { qpack_decoder_.FlushDecoderStream(); }
+
   // Set up |progressive_decoder_|.
   void StartDecoding() {
     progressive_decoder_ = CreateProgressiveDecoder(/* stream_id = */ 1);
@@ -367,10 +369,17 @@
       .InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s);
-  EXPECT_CALL(decoder_stream_sender_delegate_,
-              WriteStreamData(Eq(kHeaderAcknowledgement)))
-      .InSequence(s);
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
   EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s);
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
 
   DecodeHeaderBlock(absl::HexStringToBytes(
       "0500"  // Required Insert Count 4 and Delta Base 0.
@@ -381,6 +390,9 @@
       "80"    // Dynamic table entry with relative index 0, absolute index 3.
       "41025a5a"));  // Name of entry 1 (relative index) from dynamic table,
                      // with value "ZZ".
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("bar"))).InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s);
@@ -388,10 +400,17 @@
       .InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s);
-  EXPECT_CALL(decoder_stream_sender_delegate_,
-              WriteStreamData(Eq(kHeaderAcknowledgement)))
-      .InSequence(s);
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
   EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s);
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
 
   DecodeHeaderBlock(absl::HexStringToBytes(
       "0502"  // Required Insert Count 4 and Delta Base 2.
@@ -402,6 +421,9 @@
       "82"    // Dynamic table entry with relative index 2, absolute index 3.
       "43025a5a"));  // Name of entry 3 (relative index) from dynamic table,
                      // with value "ZZ".
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("bar"))).InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s);
@@ -409,10 +431,17 @@
       .InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq("foo"), Eq("ZZZ"))).InSequence(s);
   EXPECT_CALL(handler_, OnHeaderDecoded(Eq(":method"), Eq("ZZ"))).InSequence(s);
-  EXPECT_CALL(decoder_stream_sender_delegate_,
-              WriteStreamData(Eq(kHeaderAcknowledgement)))
-      .InSequence(s);
+  if (!GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
   EXPECT_CALL(handler_, OnDecodingCompleted()).InSequence(s);
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    EXPECT_CALL(decoder_stream_sender_delegate_,
+                WriteStreamData(Eq(kHeaderAcknowledgement)))
+        .InSequence(s);
+  }
 
   DecodeHeaderBlock(absl::HexStringToBytes(
       "0582"  // Required Insert Count 4 and Delta Base 2 with sign bit set.
@@ -423,6 +452,9 @@
       "12"    // Dynamic table entry with post-base index 2, absolute index 3.
       "01025a5a"));  // Name of entry 1 (post-base index) from dynamic table,
                      // with value "ZZ".
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 TEST_P(QpackDecoderTest, DecreasingDynamicTableCapacityEvictsEntries) {
@@ -453,6 +485,9 @@
       "0200"   // Required Insert Count 1 and Delta Base 0.
                // Base is 1 + 0 = 1.
       "80"));  // Dynamic table entry with relative index 0, absolute index 0.
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 TEST_P(QpackDecoderTest, EncoderStreamErrorEntryTooLarge) {
@@ -683,6 +718,9 @@
       "0a00"   // Encoded Required Insert Count 10, Required Insert Count 201,
                // Delta Base 0, Base 201.
       "80"));  // Emit dynamic table entry with relative index 0.
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 TEST_P(QpackDecoderTest, NonZeroRequiredInsertCountButNoDynamicEntries) {
@@ -842,6 +880,9 @@
   DecodeEncoderStreamData(absl::HexStringToBytes("3fe107"));
   // Add literal entry with name "foo" and value "bar".
   DecodeEncoderStreamData(absl::HexStringToBytes("6294e703626172"));
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 TEST_P(QpackDecoderTest, BlockedDecodingUnblockedBeforeEndOfHeaderBlock) {
@@ -879,6 +920,9 @@
   EXPECT_CALL(decoder_stream_sender_delegate_,
               WriteStreamData(Eq(kHeaderAcknowledgement)));
   EndDecoding();
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 // Regression test for https://crbug.com/1024263.
@@ -932,6 +976,9 @@
   // Add literal entry with name "foo" and value "bar".
   // Insert Count is now 6, reaching Required Insert Count of the header block.
   DecodeEncoderStreamData(absl::HexStringToBytes("6294e70362617a"));
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 TEST_P(QpackDecoderTest, TooManyBlockedStreams) {
@@ -972,6 +1019,9 @@
       "0200"   // Required Insert Count 1 and Delta Base 0.
                // Base is 1 + 0 = 1.
       "80"));  // Dynamic table entry with relative index 0, absolute index 0.
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    FlushDecoderStream();
+  }
 }
 
 }  // namespace
diff --git a/quiche/quic/core/quic_connection.cc b/quiche/quic/core/quic_connection.cc
index 11d59d1..857cf47 100644
--- a/quiche/quic/core/quic_connection.cc
+++ b/quiche/quic/core/quic_connection.cc
@@ -3171,7 +3171,7 @@
   return connected_ && !HandleWriteBlocked();
 }
 
-const QuicFrames QuicConnection::MaybeBundleAckOpportunistically() {
+const QuicFrames QuicConnection::MaybeBundleOpportunistically() {
   if (!ack_frequency_sent_ && sent_packet_manager_.CanSendAckFrequency()) {
     if (packet_creator_.NextSendingPacketNumber() >=
         FirstSendingPacketNumber() + kMinReceivedBeforeAckDecimation) {
@@ -3182,6 +3182,11 @@
     }
   }
 
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 1, 3);
+    visitor_->MaybeBundleOpportunistically();
+  }
+
   QuicFrames frames;
   const bool has_pending_ack =
       uber_received_packet_manager_
@@ -3200,6 +3205,7 @@
       << encryption_level_ << " ACK, " << (has_pending_ack ? "" : "!")
       << "has_pending_ack";
   frames.push_back(updated_ack_frame);
+  // TODO(fayang): remove return value by FlushAckFrame here.
   return frames;
 }
 
@@ -5786,6 +5792,10 @@
       uber_received_packet_manager_.GetEarliestAckTimeout();
   QUIC_BUG_IF(quic_bug_12714_32, !earliest_ack_timeout.IsInitialized());
   MaybeBundleCryptoDataWithAcks();
+  if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+    QUIC_RESTART_FLAG_COUNT_N(quic_opport_bundle_qpack_decoder_data, 2, 3);
+    visitor_->MaybeBundleOpportunistically();
+  }
   earliest_ack_timeout = uber_received_packet_manager_.GetEarliestAckTimeout();
   if (!earliest_ack_timeout.IsInitialized()) {
     return;
diff --git a/quiche/quic/core/quic_connection.h b/quiche/quic/core/quic_connection.h
index 04c02e5..01b9049 100644
--- a/quiche/quic/core/quic_connection.h
+++ b/quiche/quic/core/quic_connection.h
@@ -263,6 +263,9 @@
   // Called when the client receives a preferred address from its peer.
   virtual void OnServerPreferredAddressAvailable(
       const QuicSocketAddress& server_preferred_address) = 0;
+
+  // Asks session to bundle data opportunistically with outgoing data.
+  virtual void MaybeBundleOpportunistically() = 0;
 };
 
 // Interface which gets callbacks from the QuicConnection at interesting
@@ -727,7 +730,7 @@
   // QuicPacketCreator::DelegateInterface
   bool ShouldGeneratePacket(HasRetransmittableData retransmittable,
                             IsHandshake handshake) override;
-  const QuicFrames MaybeBundleAckOpportunistically() override;
+  const QuicFrames MaybeBundleOpportunistically() override;
   QuicPacketBuffer GetPacketBuffer() override;
   void OnSerializedPacket(SerializedPacket packet) override;
   void OnUnrecoverableError(QuicErrorCode error,
diff --git a/quiche/quic/core/quic_connection_test.cc b/quiche/quic/core/quic_connection_test.cc
index daa49b3..ccb65bf 100644
--- a/quiche/quic/core/quic_connection_test.cc
+++ b/quiche/quic/core/quic_connection_test.cc
@@ -700,6 +700,9 @@
     EXPECT_CALL(visitor_, OnCongestionWindowChange(_)).Times(AnyNumber());
     EXPECT_CALL(visitor_, OnPacketReceived(_, _, _)).Times(AnyNumber());
     EXPECT_CALL(visitor_, OnSuccessfulVersionNegotiation(_)).Times(AnyNumber());
+    if (GetQuicRestartFlag(quic_opport_bundle_qpack_decoder_data)) {
+      EXPECT_CALL(visitor_, MaybeBundleOpportunistically()).Times(AnyNumber());
+    }
     EXPECT_CALL(visitor_, OnOneRttPacketAcknowledged())
         .Times(testing::AtMost(1));
     EXPECT_CALL(*loss_algorithm_, GetLossTimeout())
diff --git a/quiche/quic/core/quic_dispatcher.cc b/quiche/quic/core/quic_dispatcher.cc
index 6f4c227..9fb397e 100644
--- a/quiche/quic/core/quic_dispatcher.cc
+++ b/quiche/quic/core/quic_dispatcher.cc
@@ -105,7 +105,7 @@
     return true;
   }
 
-  const QuicFrames MaybeBundleAckOpportunistically() override {
+  const QuicFrames MaybeBundleOpportunistically() override {
     QUICHE_DCHECK(false);
     return {};
   }
diff --git a/quiche/quic/core/quic_flags_list.h b/quiche/quic/core/quic_flags_list.h
index ad903a8..6744b55 100644
--- a/quiche/quic/core/quic_flags_list.h
+++ b/quiche/quic/core/quic_flags_list.h
@@ -35,6 +35,8 @@
 QUIC_FLAG(quic_reloadable_flag_quic_can_send_ack_frequency, true)
 // If true, allow client to enable BBRv2 on server via connection option \'B2ON\'.
 QUIC_FLAG(quic_reloadable_flag_quic_allow_client_enabled_bbr_v2, true)
+// If true, always bundle qpack decoder data with other frames opportunistically.
+QUIC_FLAG(quic_restart_flag_quic_opport_bundle_qpack_decoder_data, false)
 // If true, an endpoint does not detect path degrading or blackholing until handshake gets confirmed.
 QUIC_FLAG(quic_reloadable_flag_quic_no_path_degrading_before_handshake_confirmed, true)
 // If true, check connected at the beginning of TlsHandshaker::SetReadSecret.
diff --git a/quiche/quic/core/quic_packet_creator.cc b/quiche/quic/core/quic_packet_creator.cc
index f62f9d1..f0fa133 100644
--- a/quiche/quic/core/quic_packet_creator.cc
+++ b/quiche/quic/core/quic_packet_creator.cc
@@ -1282,7 +1282,7 @@
       << "Adding a control frame with no control frame id: " << frame;
   QUICHE_DCHECK(QuicUtils::IsRetransmittableFrame(frame.type))
       << ENDPOINT << frame;
-  MaybeBundleAckOpportunistically();
+  MaybeBundleOpportunistically();
   if (HasPendingFrames()) {
     if (AddFrame(frame, next_transmission_type_)) {
       // There is pending frames and current frame fits.
@@ -1312,7 +1312,7 @@
       << "Packet flusher is not attached when "
          "generator tries to write stream data.";
   bool has_handshake = QuicUtils::IsCryptoStreamId(transport_version(), id);
-  MaybeBundleAckOpportunistically();
+  MaybeBundleOpportunistically();
   bool fin = state != NO_FIN;
   QUIC_BUG_IF(quic_bug_12398_17, has_handshake && fin)
       << ENDPOINT << "Handshake packets should never send a fin";
@@ -1439,7 +1439,7 @@
       << ENDPOINT
       << "Packet flusher is not attached when "
          "generator tries to write crypto data.";
-  MaybeBundleAckOpportunistically();
+  MaybeBundleOpportunistically();
   // To make reasoning about crypto frames easier, we don't combine them with
   // other retransmittable frames in a single packet.
   // TODO(nharper): Once we have separate packet number spaces, everything
@@ -1511,7 +1511,7 @@
   SetMaxPacketLength(current_mtu);
 }
 
-void QuicPacketCreator::MaybeBundleAckOpportunistically() {
+void QuicPacketCreator::MaybeBundleOpportunistically() {
   if (has_ack()) {
     // Ack already queued, nothing to do.
     return;
@@ -1520,8 +1520,7 @@
                                        NOT_HANDSHAKE)) {
     return;
   }
-  const bool flushed =
-      FlushAckFrame(delegate_->MaybeBundleAckOpportunistically());
+  const bool flushed = FlushAckFrame(delegate_->MaybeBundleOpportunistically());
   QUIC_BUG_IF(quic_bug_10752_29, !flushed)
       << ENDPOINT << "Failed to flush ACK frame. encryption_level:"
       << packet_.encryption_level;
@@ -1532,8 +1531,8 @@
       << ENDPOINT
       << "Packet flusher is not attached when "
          "generator tries to send ACK frame.";
-  // MaybeBundleAckOpportunistically could be called nestedly when sending a
-  // control frame causing another control frame to be sent.
+  // MaybeBundleOpportunistically could be called nestedly when
+  // sending a control frame causing another control frame to be sent.
   QUIC_BUG_IF(quic_bug_12398_18, !frames.empty() && has_ack())
       << ENDPOINT << "Trying to flush " << quiche::PrintElements(frames)
       << " when there is ACK queued";
@@ -1615,7 +1614,7 @@
       << ENDPOINT
       << "Packet flusher is not attached when "
          "generator tries to add message frame.";
-  MaybeBundleAckOpportunistically();
+  MaybeBundleOpportunistically();
   const QuicByteCount message_length = MemSliceSpanTotalSize(message);
   if (message_length > GetCurrentLargestMessagePayload()) {
     return MESSAGE_STATUS_TOO_LARGE;
diff --git a/quiche/quic/core/quic_packet_creator.h b/quiche/quic/core/quic_packet_creator.h
index 7c53414..88f17d5 100644
--- a/quiche/quic/core/quic_packet_creator.h
+++ b/quiche/quic/core/quic_packet_creator.h
@@ -60,9 +60,9 @@
     // Consults delegate whether a packet should be generated.
     virtual bool ShouldGeneratePacket(HasRetransmittableData retransmittable,
                                       IsHandshake handshake) = 0;
-    // Called when there is data to be sent. Retrieves updated ACK frame from
-    // the delegate.
-    virtual const QuicFrames MaybeBundleAckOpportunistically() = 0;
+    // Called when there is data to be sent. Gives delegate a chance to bundle
+    // anything with to-be-sent data.
+    virtual const QuicFrames MaybeBundleOpportunistically() = 0;
 
     // Returns the packet fate for serialized packets which will be handed over
     // to delegate via OnSerializedPacket(). Called when a packet is about to be
@@ -369,9 +369,9 @@
   // Generates an MTU discovery packet of specified size.
   void GenerateMtuDiscoveryPacket(QuicByteCount target_mtu);
 
-  // Called when there is data to be sent, Retrieves updated ACK frame from
-  // delegate_ and flushes it.
-  void MaybeBundleAckOpportunistically();
+  // Called when there is data to be sent. Gives delegate a chance to bundle any
+  // data (including ACK).
+  void MaybeBundleOpportunistically();
 
   // Called to flush ACK and STOP_WAITING frames, returns false if the flush
   // fails.
diff --git a/quiche/quic/core/quic_packet_creator_test.cc b/quiche/quic/core/quic_packet_creator_test.cc
index aa919db..30c8173 100644
--- a/quiche/quic/core/quic_packet_creator_test.cc
+++ b/quiche/quic/core/quic_packet_creator_test.cc
@@ -2476,8 +2476,7 @@
   MOCK_METHOD(bool, ShouldGeneratePacket,
               (HasRetransmittableData retransmittable, IsHandshake handshake),
               (override));
-  MOCK_METHOD(const QuicFrames, MaybeBundleAckOpportunistically, (),
-              (override));
+  MOCK_METHOD(const QuicFrames, MaybeBundleOpportunistically, (), (override));
   MOCK_METHOD(QuicPacketBuffer, GetPacketBuffer, (), (override));
   MOCK_METHOD(void, OnSerializedPacket, (SerializedPacket), (override));
   MOCK_METHOD(void, OnUnrecoverableError, (QuicErrorCode, const std::string&),
@@ -2556,7 +2555,7 @@
       }
       if (delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA,
                                           NOT_HANDSHAKE)) {
-        EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically())
+        EXPECT_CALL(*delegate_, MaybeBundleOpportunistically())
             .WillOnce(Return(frames));
       }
     }
@@ -2583,7 +2582,7 @@
     }
     if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA,
                                                       NOT_HANDSHAKE)) {
-      EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1);
+      EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1);
     }
     return QuicPacketCreator::ConsumeData(id, data.length(), offset, state);
   }
@@ -2592,7 +2591,7 @@
                                 quiche::QuicheMemSlice message) {
     if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA,
                                                       NOT_HANDSHAKE)) {
-      EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1);
+      EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1);
     }
     return QuicPacketCreator::AddMessageFrame(message_id,
                                               absl::MakeSpan(&message, 1));
@@ -2603,7 +2602,7 @@
     producer_->SaveCryptoData(level, offset, data);
     if (!has_ack() && delegate_->ShouldGeneratePacket(NO_RETRANSMITTABLE_DATA,
                                                       NOT_HANDSHAKE)) {
-      EXPECT_CALL(*delegate_, MaybeBundleAckOpportunistically()).Times(1);
+      EXPECT_CALL(*delegate_, MaybeBundleOpportunistically()).Times(1);
     }
     return QuicPacketCreator::ConsumeCryptoData(level, data.length(), offset);
   }
diff --git a/quiche/quic/core/quic_session.h b/quiche/quic/core/quic_session.h
index fe7e4bb..a518091 100644
--- a/quiche/quic/core/quic_session.h
+++ b/quiche/quic/core/quic_session.h
@@ -189,6 +189,7 @@
       std::unique_ptr<QuicPathValidationContext> /*context*/) override {}
   void OnServerPreferredAddressAvailable(
       const QuicSocketAddress& /*server_preferred_address*/) override;
+  void MaybeBundleOpportunistically() override {}
 
   // QuicStreamFrameDataProducer
   WriteStreamDataResult WriteStreamData(QuicStreamId id,
@@ -841,6 +842,11 @@
     return absl::nullopt;
   }
 
+  // Debug helper for OnCanWrite. Check that after QuicStream::OnCanWrite(),
+  // if stream has buffered data and is not stream level flow control blocked,
+  // it has to be in the write blocked list.
+  virtual bool CheckStreamWriteBlocked(QuicStream* stream) const;
+
  private:
   friend class test::QuicSessionPeer;
 
@@ -872,11 +878,6 @@
                                  uint64_t previous_bytes_written,
                                  bool previous_fin_sent);
 
-  // Debug helper for OnCanWrite. Check that after QuicStream::OnCanWrite(),
-  // if stream has buffered data and is not stream level flow control blocked,
-  // it has to be in the write blocked list.
-  bool CheckStreamWriteBlocked(QuicStream* stream) const;
-
   // Called in OnConfigNegotiated for Finch trials to measure performance of
   // starting with larger flow control receive windows.
   void AdjustInitialFlowControlWindows(size_t stream_window);