Support blocked decoding in QuicSpdyStream.

Modify QpackDecodedHeadersAccumulator interface and QuicSpdyStream
implementation to support blocked decoding of QPACK headers.

gfe-relnote: n/a, QUIC v99-only change.
PiperOrigin-RevId: 256201744
Change-Id: I17a4b6c0c6d4f30d8526c8b645c55da3ce01a076
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index e3d9357..9966f81 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -11,7 +11,6 @@
 #include "net/third_party/quiche/src/quic/core/http/http_decoder.h"
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h"
 #include "net/third_party/quiche/src/quic/core/http/spdy_utils.h"
-#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
@@ -163,12 +162,14 @@
       spdy_session_(spdy_session),
       on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
+      blocked_on_decoding_headers_(false),
       headers_decompressed_(false),
       headers_length_(0, 0),
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       headers_bytes_to_be_marked_consumed_(0),
+      pretend_blocked_decoding_for_tests_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       sequencer_offset_(0),
@@ -197,12 +198,14 @@
       spdy_session_(spdy_session),
       on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
+      blocked_on_decoding_headers_(false),
       headers_decompressed_(false),
       headers_length_(0, 0),
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       headers_bytes_to_be_marked_consumed_(0),
+      pretend_blocked_decoding_for_tests_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       sequencer_offset_(sequencer()->NumBytesConsumed()),
@@ -497,6 +500,24 @@
   }
 }
 
+void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers) {
+  blocked_on_decoding_headers_ = false;
+  ProcessDecodedHeaders(headers);
+  // Continue decoding HTTP/3 frames.
+  OnDataAvailable();
+}
+
+void QuicSpdyStream::OnHeaderDecodingError() {
+  // TODO(b/124216424): Use HTTP_EXCESSIVE_LOAD or
+  // HTTP_QPACK_DECOMPRESSION_FAILED error code as indicated by
+  // |qpack_decoded_headers_accumulator_|.
+  std::string error_message = QuicStrCat(
+      "Error during async decoding of ",
+      headers_decompressed_ ? "trailers" : "headers", " on stream ", id(), ": ",
+      qpack_decoded_headers_accumulator_->error_message());
+  CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+}
+
 void QuicSpdyStream::OnHeadersTooLarge() {
   if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
     // TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code.
@@ -632,6 +653,10 @@
     return;
   }
 
+  if (blocked_on_decoding_headers_) {
+    return;
+  }
+
   iovec iov;
   while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) {
     DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
@@ -645,6 +670,9 @@
         reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
     is_decoder_processing_input_ = false;
     sequencer_offset_ += processed_bytes;
+    if (blocked_on_decoding_headers_) {
+      return;
+    }
   }
 
   // Do not call OnBodyAvailable() until headers are consumed.
@@ -819,8 +847,9 @@
 
   qpack_decoded_headers_accumulator_ =
       QuicMakeUnique<QpackDecodedHeadersAccumulator>(
-          id(), spdy_session_->qpack_decoder(),
-          spdy_session_->max_inbound_header_list_size());
+          id(), spdy_session_->qpack_decoder(), this,
+          spdy_session_->max_inbound_header_list_size(),
+          pretend_blocked_decoding_for_tests_);
 
   // Do not call MaybeMarkHeadersBytesConsumed() yet, because
   // HEADERS frame header bytes might not have been parsed completely.
@@ -851,7 +880,9 @@
 bool QuicSpdyStream::OnHeadersFrameEnd() {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
 
-  if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) {
+  auto result = qpack_decoded_headers_accumulator_->EndHeaderBlock();
+
+  if (result == QpackDecodedHeadersAccumulator::Status::kError) {
     // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
     std::string error_message =
         QuicStrCat("Error decompressing header block on stream ", id(), ": ",
@@ -860,13 +891,23 @@
     return false;
   }
 
+  if (result == QpackDecodedHeadersAccumulator::Status::kBlocked) {
+    blocked_on_decoding_headers_ = true;
+    return false;
+  }
+
+  DCHECK(result == QpackDecodedHeadersAccumulator::Status::kSuccess);
+
+  ProcessDecodedHeaders(qpack_decoded_headers_accumulator_->quic_header_list());
+  return !sequencer()->IsClosed() && !reading_stopped();
+}
+
+void QuicSpdyStream::ProcessDecodedHeaders(const QuicHeaderList& headers) {
   const QuicByteCount frame_length = headers_decompressed_
                                          ? trailers_length_.payload_length
                                          : headers_length_.payload_length;
-  OnStreamHeaderList(/* fin = */ false, frame_length,
-                     qpack_decoded_headers_accumulator_->quic_header_list());
+  OnStreamHeaderList(/* fin = */ false, frame_length, headers);
   qpack_decoded_headers_accumulator_.reset();
-  return !sequencer()->IsClosed() && !reading_stopped();
 }
 
 size_t QuicSpdyStream::WriteHeadersImpl(