Respect incremental parameter in QuicWriteBlockedList.

As a reminder, HTTP/3 streams have two priority-related parameters received from
the clien in PRIORITY_UPDATE frames: urgency (int) and incremental (bool).

Before this CL, the incremental parameter is ignored.  Writes within an urgency
bucket are batched: if the last stream with data written has more data to write,
it will be written unless more than 16 kB has been written so far.

After this CL, writes on non-incremental stream are are batched with no limit.
This is because partial responses on incremental streams are not useful for the
client, so if there is more data available, it should be sent before other
streams of equal urgency.

Buffering behavior of incremental streams is not changed.

Protected by FLAGS_quic_reloadable_flag_quic_priority_respect_incremental.

PiperOrigin-RevId: 511587715
diff --git a/quiche/quic/core/http/quic_spdy_session_test.cc b/quiche/quic/core/http/quic_spdy_session_test.cc
index 4c23b9e..e3db301 100644
--- a/quiche/quic/core/http/quic_spdy_session_test.cc
+++ b/quiche/quic/core/http/quic_spdy_session_test.cc
@@ -865,6 +865,12 @@
   TestStream* stream4 = session_.CreateOutgoingBidirectionalStream();
   TestStream* stream6 = session_.CreateOutgoingBidirectionalStream();
 
+  const QuicStreamPriority priority{QuicStreamPriority::kDefaultUrgency,
+                                    /* incremental = */ true};
+  stream2->SetPriority(priority);
+  stream4->SetPriority(priority);
+  stream6->SetPriority(priority);
+
   session_.set_writev_consumes_all_data(true);
   session_.MarkConnectionLevelWriteBlocked(stream2->id());
   session_.MarkConnectionLevelWriteBlocked(stream4->id());
diff --git a/quiche/quic/core/quic_flags_list.h b/quiche/quic/core/quic_flags_list.h
index 236ac73..c93d01e 100644
--- a/quiche/quic/core/quic_flags_list.h
+++ b/quiche/quic/core/quic_flags_list.h
@@ -65,6 +65,8 @@
 QUIC_FLAG(quic_reloadable_flag_quic_act_upon_invalid_header, false)
 // If true, require handshake confirmation for QUIC connections, functionally disabling 0-rtt handshakes.
 QUIC_FLAG(quic_reloadable_flag_quic_require_handshake_confirmation, false)
+// If true, respect the incremental parameter of each stream in QuicWriteBlockedList.
+QUIC_FLAG(quic_reloadable_flag_quic_priority_respect_incremental, false)
 // If true, server proactively retires client issued connection ID on reverse path validation failure. 
 QUIC_FLAG(quic_reloadable_flag_quic_retire_cid_on_reverse_path_validation_failure, true)
 // If true, server sends bandwidth eastimate when network is idle for a while.
diff --git a/quiche/quic/core/quic_session_test.cc b/quiche/quic/core/quic_session_test.cc
index c65ae18..3906937 100644
--- a/quiche/quic/core/quic_session_test.cc
+++ b/quiche/quic/core/quic_session_test.cc
@@ -1030,6 +1030,12 @@
   TestStream* stream4 = session_.CreateOutgoingBidirectionalStream();
   TestStream* stream6 = session_.CreateOutgoingBidirectionalStream();
 
+  const QuicStreamPriority priority{QuicStreamPriority::kDefaultUrgency,
+                                    /* incremental = */ true};
+  stream2->SetPriority(priority);
+  stream4->SetPriority(priority);
+  stream6->SetPriority(priority);
+
   session_.set_writev_consumes_all_data(true);
   session_.MarkConnectionLevelWriteBlocked(stream2->id());
   session_.MarkConnectionLevelWriteBlocked(stream4->id());
diff --git a/quiche/quic/core/quic_write_blocked_list.cc b/quiche/quic/core/quic_write_blocked_list.cc
index f6d00f2..86ed31d 100644
--- a/quiche/quic/core/quic_write_blocked_list.cc
+++ b/quiche/quic/core/quic_write_blocked_list.cc
@@ -9,7 +9,10 @@
 
 namespace quic {
 
-QuicWriteBlockedList::QuicWriteBlockedList() : last_priority_popped_(0) {
+QuicWriteBlockedList::QuicWriteBlockedList()
+    : last_priority_popped_(0),
+      respect_incremental_(
+          GetQuicReloadableFlag(quic_priority_respect_incremental)) {
   memset(batch_write_stream_id_, 0, sizeof(batch_write_stream_id_));
   memset(bytes_left_for_batch_write_, 0, sizeof(bytes_left_for_batch_write_));
 }
@@ -96,9 +99,20 @@
     return;
   }
 
+  bool incremental = true;
+
+  if (respect_incremental_) {
+    QUIC_RELOADABLE_FLAG_COUNT(quic_priority_respect_incremental);
+    incremental =
+        priority_write_scheduler_.GetStreamPriority(stream_id).incremental;
+  }
+
+  // Writes on incremental streams are batched up to 16 kB.
+  // Writes on non-incremental streams continue whenever possible.
   bool push_front =
       stream_id == batch_write_stream_id_[last_priority_popped_] &&
-      bytes_left_for_batch_write_[last_priority_popped_] > 0;
+      (!incremental || bytes_left_for_batch_write_[last_priority_popped_] > 0);
+
   priority_write_scheduler_.MarkStreamReady(stream_id, push_front);
 }
 
diff --git a/quiche/quic/core/quic_write_blocked_list.h b/quiche/quic/core/quic_write_blocked_list.h
index b366eaf..422b79c 100644
--- a/quiche/quic/core/quic_write_blocked_list.h
+++ b/quiche/quic/core/quic_write_blocked_list.h
@@ -146,6 +146,9 @@
   };
 
   StaticStreamCollection static_stream_collection_;
+
+  // Latched value of reloadable_flag_quic_priority_respect_incremental.
+  const bool respect_incremental_;
 };
 
 }  // namespace quic
diff --git a/quiche/quic/core/quic_write_blocked_list_test.cc b/quiche/quic/core/quic_write_blocked_list_test.cc
index e69e071..b13edec 100644
--- a/quiche/quic/core/quic_write_blocked_list_test.cc
+++ b/quiche/quic/core/quic_write_blocked_list_test.cc
@@ -190,9 +190,9 @@
   const QuicStreamId id1 = 5;
   const QuicStreamId id2 = 7;
   const QuicStreamId id3 = 9;
-  RegisterStream(id1, kNotStatic, {kV3LowestPriority, kNotIncremental});
-  RegisterStream(id2, kNotStatic, {kV3LowestPriority, kNotIncremental});
-  RegisterStream(id3, kNotStatic, {kV3HighestPriority, kNotIncremental});
+  RegisterStream(id1, kNotStatic, {kV3LowestPriority, kIncremental});
+  RegisterStream(id2, kNotStatic, {kV3LowestPriority, kIncremental});
+  RegisterStream(id3, kNotStatic, {kV3HighestPriority, kIncremental});
 
   AddStream(id1);
   AddStream(id2);
@@ -236,6 +236,182 @@
   EXPECT_EQ(id1, PopFront());
 }
 
+TEST_F(QuicWriteBlockedListTest, IncrementalStreamsRoundRobin) {
+  const QuicStreamId id1 = 5;
+  const QuicStreamId id2 = 7;
+  const QuicStreamId id3 = 9;
+  RegisterStream(id1, kNotStatic, {kV3LowestPriority, kIncremental});
+  RegisterStream(id2, kNotStatic, {kV3LowestPriority, kIncremental});
+  RegisterStream(id3, kNotStatic, {kV3LowestPriority, kIncremental});
+
+  AddStream(id1);
+  AddStream(id2);
+  AddStream(id3);
+
+  EXPECT_EQ(id1, PopFront());
+  const size_t kLargeWriteSize = 1000 * 1000 * 1000;
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+  EXPECT_EQ(id3, PopFront());
+  UpdateBytesForStream(id3, kLargeWriteSize);
+
+  AddStream(id3);
+  AddStream(id2);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  EXPECT_EQ(id3, PopFront());
+  UpdateBytesForStream(id3, kLargeWriteSize);
+  AddStream(id3);
+
+  EXPECT_EQ(id2, PopFront());
+  EXPECT_EQ(id3, PopFront());
+}
+
+TEST_F(QuicWriteBlockedListTest, NonIncrementalStreamsKeepWriting) {
+  if (!GetQuicReloadableFlag(quic_priority_respect_incremental)) {
+    return;
+  }
+
+  const QuicStreamId id1 = 1;
+  const QuicStreamId id2 = 2;
+  const QuicStreamId id3 = 3;
+  const QuicStreamId id4 = 4;
+  RegisterStream(id1, kNotStatic, {kV3LowestPriority, kNotIncremental});
+  RegisterStream(id2, kNotStatic, {kV3LowestPriority, kNotIncremental});
+  RegisterStream(id3, kNotStatic, {kV3LowestPriority, kNotIncremental});
+  RegisterStream(id4, kNotStatic, {kV3HighestPriority, kNotIncremental});
+
+  AddStream(id1);
+  AddStream(id2);
+  AddStream(id3);
+
+  // A non-incremental stream can continue writing as long as it has data.
+  EXPECT_EQ(id1, PopFront());
+  const size_t kLargeWriteSize = 1000 * 1000 * 1000;
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  // A higher priority stream takes precedence.
+  AddStream(id4);
+  EXPECT_EQ(id4, PopFront());
+
+  // When it is the turn of the lower urgency bucket again, writing of the first
+  // stream will continue.
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+
+  // When there is no more data on the first stream, write can start on the
+  // second stream.
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+  AddStream(id2);
+
+  // Write continues without limit.
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+  AddStream(id2);
+
+  // Stream 1 is not the most recently written one, therefore it gets to the end
+  // of the dequeue.
+  AddStream(id1);
+
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+
+  EXPECT_EQ(id3, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+  AddStream(id3);
+
+  EXPECT_EQ(id3, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+
+  // When there is no data to write either on stream 2 or stream 3, stream 1 can
+  // resume.
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+}
+
+TEST_F(QuicWriteBlockedListTest, IncrementalAndNonIncrementalStreams) {
+  if (!GetQuicReloadableFlag(quic_priority_respect_incremental)) {
+    return;
+  }
+
+  const QuicStreamId id1 = 1;
+  const QuicStreamId id2 = 2;
+  RegisterStream(id1, kNotStatic, {kV3LowestPriority, kNotIncremental});
+  RegisterStream(id2, kNotStatic, {kV3LowestPriority, kIncremental});
+
+  AddStream(id1);
+  AddStream(id2);
+
+  // Small writes do not exceed the batch limit.  Writes continue on streams
+  // with most recently written data, regardless of the incremental parameter
+  // value.
+  EXPECT_EQ(id1, PopFront());
+  const size_t kSmallWriteSize = 1000;
+  UpdateBytesForStream(id1, kSmallWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kSmallWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kSmallWriteSize);
+
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kSmallWriteSize);
+  AddStream(id2);
+  AddStream(id1);
+
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kSmallWriteSize);
+  AddStream(id2);
+
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kSmallWriteSize);
+
+  // A non-incremental stream can continue writing as long as it has data.
+  EXPECT_EQ(id1, PopFront());
+  const size_t kLargeWriteSize = 1000 * 1000 * 1000;
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id1);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+  AddStream(id2);
+  AddStream(id1);
+
+  // However, if the batching limit is exceeded on stream 2, this stream will
+  // yield to stream 1.
+  EXPECT_EQ(id2, PopFront());
+  UpdateBytesForStream(id2, kLargeWriteSize);
+  AddStream(id2);
+
+  EXPECT_EQ(id1, PopFront());
+  UpdateBytesForStream(id1, kLargeWriteSize);
+}
+
 TEST_F(QuicWriteBlockedListTest, Ceding) {
   RegisterStream(15, kNotStatic, {kV3HighestPriority, kNotIncremental});
   RegisterStream(16, kNotStatic, {kV3HighestPriority, kNotIncremental});