Call MarkConsumed() on QPACK bytes incrementally, as soon as possible.

QPACK data is already processed incrementally, but calling
QuicStreamSequencer::MarkConsumed() is postponed until headers or trailers are
consumed by the higher layer as signalled by a ConsumeHeaderList() or
MarkTrailersConsumed() call.  This conflicts with test
ResponseProcessingTest.CookieRequestHeaderExcessivelyLong that had to be
disabled at b/249121660.

This CL makes sure MarkConsumed() is called for every fragment of HEADERS frames
immediately, every time for headers, and for trailers only if data bytes are all
consumed (otherwise the sequencer starts freeing up data bytes).  And if not all data
bytes are consumed, then MarkConsumed() is called for trailer bytes as soon as
possible.

gfe-relnote: n/a; QUIC v99 only.
PiperOrigin-RevId: 252430032
Change-Id: I1aa005971a3de72ff536bb54a35817b8fd56a68e
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index b025fd8..b7d3011 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -162,7 +162,7 @@
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
-      trailers_consumed_in_sequencer_(false),
+      headers_bytes_to_be_marked_consumed_(0),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       ack_listener_(nullptr) {
@@ -193,7 +193,7 @@
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
-      trailers_consumed_in_sequencer_(false),
+      headers_bytes_to_be_marked_consumed_(0),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       ack_listener_(nullptr) {
@@ -375,9 +375,13 @@
     return sequencer()->Readv(iov, iov_len);
   }
   size_t bytes_read = body_buffer_.ReadBody(iov, iov_len);
-  if (trailers_consumed_) {
-    MarkTrailersConsumed();
+
+  if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    // Maybe all DATA frame bytes have been read and some trailing HEADERS had
+    // already been processed, in which case MarkConsumed() should be called.
+    MaybeMarkHeadersBytesConsumed();
   }
+
   return bytes_read;
 }
 
@@ -398,8 +402,11 @@
     return;
   }
   body_buffer_.MarkBodyConsumed(num_bytes);
-  if (trailers_consumed_) {
-    MarkTrailersConsumed();
+
+  if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    // Maybe all DATA frame bytes have been read and some trailing HEADERS had
+    // already been processed, in which case MarkConsumed() should be called.
+    MaybeMarkHeadersBytesConsumed();
   }
 }
 
@@ -419,17 +426,6 @@
 }
 
 void QuicSpdyStream::MarkTrailersConsumed() {
-  if (VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
-      !reading_stopped() && !body_buffer_.HasBytesToRead() &&
-      !trailers_consumed_in_sequencer_) {
-    const QuicByteCount trailers_total_length =
-        trailers_length_.header_length + trailers_length_.payload_length;
-    if (trailers_total_length > 0) {
-      sequencer()->MarkConsumed(trailers_total_length);
-      trailers_consumed_in_sequencer_ = true;
-    }
-  }
-
   trailers_consumed_ = true;
 }
 
@@ -444,22 +440,12 @@
 void QuicSpdyStream::ConsumeHeaderList() {
   header_list_.Clear();
 
-  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
-    if (FinishedReadingHeaders()) {
-      sequencer()->SetUnblocked();
-    }
+  if (!FinishedReadingHeaders()) {
     return;
   }
 
-  if (!reading_stopped()) {
-    const QuicByteCount headers_total_length =
-        headers_length_.header_length + headers_length_.payload_length;
-    if (headers_total_length > 0) {
-      sequencer()->MarkConsumed(headers_total_length);
-    }
-  }
-
-  if (!FinishedReadingHeaders()) {
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    sequencer()->SetUnblocked();
     return;
   }
 
@@ -483,6 +469,7 @@
 bool QuicSpdyStream::OnStreamHeaderList(bool fin,
                                         size_t frame_len,
                                         const QuicHeaderList& header_list) {
+  // TODO(b/134706391): remove |fin| argument.
   // The headers list avoid infinite buffering by clearing the headers list
   // if the current headers are too large. So if the list is empty here
   // then the headers list must have been too large, and the stream should
@@ -519,6 +506,7 @@
     bool fin,
     size_t /*frame_len*/,
     const QuicHeaderList& header_list) {
+  // TODO(b/134706391): remove |fin| argument.
   headers_decompressed_ = true;
   header_list_ = header_list;
 
@@ -556,11 +544,10 @@
     bool fin,
     size_t /*frame_len*/,
     const QuicHeaderList& header_list) {
+  // TODO(b/134706391): remove |fin| argument.
   DCHECK(!trailers_decompressed_);
-  if ((VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
-       sequencer()->IsClosed()) ||
-      (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
-       fin_received())) {
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+      fin_received()) {
     QUIC_DLOG(INFO) << "Received Trailers after FIN, on stream: " << id();
     session()->connection()->CloseConnection(
         QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin",
@@ -770,6 +757,16 @@
   }
 }
 
+void QuicSpdyStream::MaybeMarkHeadersBytesConsumed() {
+  DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+  if (!body_buffer_.HasBytesToRead() && !reading_stopped() &&
+      headers_bytes_to_be_marked_consumed_ > 0) {
+    sequencer()->MarkConsumed(headers_bytes_to_be_marked_consumed_);
+    headers_bytes_to_be_marked_consumed_ = 0;
+  }
+}
+
 QuicByteCount QuicSpdyStream::GetNumFrameHeadersInInterval(
     QuicStreamOffset offset,
     QuicByteCount data_length) const {
@@ -796,13 +793,23 @@
       QuicMakeUnique<QpackDecodedHeadersAccumulator>(
           id(), spdy_session_->qpack_decoder(),
           spdy_session_->max_inbound_header_list_size());
+
+  // Do not call MaybeMarkHeadersBytesConsumed() yet, because
+  // HEADERS frame header bytes might not have been parsed completely.
+  headers_bytes_to_be_marked_consumed_ += frame_length.header_length;
+
   return true;
 }
 
 bool QuicSpdyStream::OnHeadersFramePayload(QuicStringPiece payload) {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
 
-  if (!qpack_decoded_headers_accumulator_->Decode(payload)) {
+  const bool success = qpack_decoded_headers_accumulator_->Decode(payload);
+
+  headers_bytes_to_be_marked_consumed_ += payload.size();
+  MaybeMarkHeadersBytesConsumed();
+
+  if (!success) {
     // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
     std::string error_message =
         QuicStrCat("Error decompressing header block on stream ", id(), ": ",