Support blocked decoding in QuicSpdyStream.

Modify QpackDecodedHeadersAccumulator interface and QuicSpdyStream
implementation to support blocked decoding of QPACK headers.

gfe-relnote: n/a, QUIC v99-only change.
PiperOrigin-RevId: 256201744
Change-Id: I17a4b6c0c6d4f30d8526c8b645c55da3ce01a076
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index e3d9357..9966f81 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -11,7 +11,6 @@
 #include "net/third_party/quiche/src/quic/core/http/http_decoder.h"
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h"
 #include "net/third_party/quiche/src/quic/core/http/spdy_utils.h"
-#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h"
 #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
@@ -163,12 +162,14 @@
       spdy_session_(spdy_session),
       on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
+      blocked_on_decoding_headers_(false),
       headers_decompressed_(false),
       headers_length_(0, 0),
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       headers_bytes_to_be_marked_consumed_(0),
+      pretend_blocked_decoding_for_tests_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       sequencer_offset_(0),
@@ -197,12 +198,14 @@
       spdy_session_(spdy_session),
       on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
+      blocked_on_decoding_headers_(false),
       headers_decompressed_(false),
       headers_length_(0, 0),
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       headers_bytes_to_be_marked_consumed_(0),
+      pretend_blocked_decoding_for_tests_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       sequencer_offset_(sequencer()->NumBytesConsumed()),
@@ -497,6 +500,24 @@
   }
 }
 
+void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers) {
+  blocked_on_decoding_headers_ = false;
+  ProcessDecodedHeaders(headers);
+  // Continue decoding HTTP/3 frames.
+  OnDataAvailable();
+}
+
+void QuicSpdyStream::OnHeaderDecodingError() {
+  // TODO(b/124216424): Use HTTP_EXCESSIVE_LOAD or
+  // HTTP_QPACK_DECOMPRESSION_FAILED error code as indicated by
+  // |qpack_decoded_headers_accumulator_|.
+  std::string error_message = QuicStrCat(
+      "Error during async decoding of ",
+      headers_decompressed_ ? "trailers" : "headers", " on stream ", id(), ": ",
+      qpack_decoded_headers_accumulator_->error_message());
+  CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+}
+
 void QuicSpdyStream::OnHeadersTooLarge() {
   if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
     // TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code.
@@ -632,6 +653,10 @@
     return;
   }
 
+  if (blocked_on_decoding_headers_) {
+    return;
+  }
+
   iovec iov;
   while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) {
     DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
@@ -645,6 +670,9 @@
         reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
     is_decoder_processing_input_ = false;
     sequencer_offset_ += processed_bytes;
+    if (blocked_on_decoding_headers_) {
+      return;
+    }
   }
 
   // Do not call OnBodyAvailable() until headers are consumed.
@@ -819,8 +847,9 @@
 
   qpack_decoded_headers_accumulator_ =
       QuicMakeUnique<QpackDecodedHeadersAccumulator>(
-          id(), spdy_session_->qpack_decoder(),
-          spdy_session_->max_inbound_header_list_size());
+          id(), spdy_session_->qpack_decoder(), this,
+          spdy_session_->max_inbound_header_list_size(),
+          pretend_blocked_decoding_for_tests_);
 
   // Do not call MaybeMarkHeadersBytesConsumed() yet, because
   // HEADERS frame header bytes might not have been parsed completely.
@@ -851,7 +880,9 @@
 bool QuicSpdyStream::OnHeadersFrameEnd() {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
 
-  if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) {
+  auto result = qpack_decoded_headers_accumulator_->EndHeaderBlock();
+
+  if (result == QpackDecodedHeadersAccumulator::Status::kError) {
     // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
     std::string error_message =
         QuicStrCat("Error decompressing header block on stream ", id(), ": ",
@@ -860,13 +891,23 @@
     return false;
   }
 
+  if (result == QpackDecodedHeadersAccumulator::Status::kBlocked) {
+    blocked_on_decoding_headers_ = true;
+    return false;
+  }
+
+  DCHECK(result == QpackDecodedHeadersAccumulator::Status::kSuccess);
+
+  ProcessDecodedHeaders(qpack_decoded_headers_accumulator_->quic_header_list());
+  return !sequencer()->IsClosed() && !reading_stopped();
+}
+
+void QuicSpdyStream::ProcessDecodedHeaders(const QuicHeaderList& headers) {
   const QuicByteCount frame_length = headers_decompressed_
                                          ? trailers_length_.payload_length
                                          : headers_length_.payload_length;
-  OnStreamHeaderList(/* fin = */ false, frame_length,
-                     qpack_decoded_headers_accumulator_->quic_header_list());
+  OnStreamHeaderList(/* fin = */ false, frame_length, headers);
   qpack_decoded_headers_accumulator_.reset();
-  return !sequencer()->IsClosed() && !reading_stopped();
 }
 
 size_t QuicSpdyStream::WriteHeadersImpl(
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 1a5c1b6..9a59ddc 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -19,6 +19,7 @@
 #include "net/third_party/quiche/src/quic/core/http/http_encoder.h"
 #include "net/third_party/quiche/src/quic/core/http/quic_header_list.h"
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream_body_buffer.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
 #include "net/third_party/quiche/src/quic/core/quic_packets.h"
 #include "net/third_party/quiche/src/quic/core/quic_stream.h"
 #include "net/third_party/quiche/src/quic/core/quic_stream_sequencer.h"
@@ -34,11 +35,12 @@
 class QuicStreamPeer;
 }  // namespace test
 
-class QpackDecodedHeadersAccumulator;
 class QuicSpdySession;
 
 // A QUIC stream that can send and receive HTTP2 (SPDY) headers.
-class QUIC_EXPORT_PRIVATE QuicSpdyStream : public QuicStream {
+class QUIC_EXPORT_PRIVATE QuicSpdyStream
+    : public QuicStream,
+      public QpackDecodedHeadersAccumulator::Visitor {
  public:
   // Visitor receives callbacks from the stream.
   class QUIC_EXPORT_PRIVATE Visitor {
@@ -206,6 +208,10 @@
 
   using QuicStream::CloseWriteSide;
 
+  // QpackDecodedHeadersAccumulator::Visitor implementation.
+  void OnHeadersDecoded(QuicHeaderList headers) override;
+  void OnHeaderDecodingError() override;
+
  protected:
   // Called when the received headers are too large. By default this will
   // reset the stream.
@@ -246,6 +252,9 @@
   bool OnHeadersFramePayload(QuicStringPiece payload);
   bool OnHeadersFrameEnd();
 
+  // Called internally when headers are decoded.
+  void ProcessDecodedHeaders(const QuicHeaderList& headers);
+
   // Call QuicStreamSequencer::MarkConsumed() with
   // |headers_bytes_to_be_marked_consumed_| if appropriate.
   void MaybeMarkHeadersBytesConsumed();
@@ -260,6 +269,10 @@
   bool on_body_available_called_because_sequencer_is_closed_;
 
   Visitor* visitor_;
+
+  // True if read side processing is blocked while waiting for callback from
+  // QPACK decoder.
+  bool blocked_on_decoding_headers_;
   // True if the headers have been completely decompressed.
   bool headers_decompressed_;
   // Contains a copy of the decompressed header (name, value) pairs until they
@@ -284,6 +297,9 @@
   HttpEncoder encoder_;
   // Http decoder for processing raw incoming stream frames.
   HttpDecoder decoder_;
+  // TODO(b/112770235): Remove once blocked decoding is implemented
+  // and can be tested with delayed encoder stream data.
+  bool pretend_blocked_decoding_for_tests_;
   // Headers accumulator for decoding HEADERS frame payload.
   std::unique_ptr<QpackDecodedHeadersAccumulator>
       qpack_decoded_headers_accumulator_;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index bab9228..b50c3bb 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1810,6 +1810,146 @@
   stream_->OnStreamFrame(frame);
 }
 
+TEST_P(QuicSpdyStreamTest, BlockedHeaderDecoding) {
+  if (!VersionUsesQpack(GetParam().transport_version)) {
+    return;
+  }
+
+  Initialize(kShouldProcessData);
+  QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+  // HEADERS frame with QPACK encoded single header field "foo: bar".
+  std::string headers =
+      HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+  // Even though entire header block is received, it cannot be decoded.
+  EXPECT_FALSE(stream_->headers_decompressed());
+
+  // Signal to QpackDecodedHeadersAccumulator that the header block has been
+  // decoded.  (OnDecodingCompleted() has already been called by the QPACK
+  // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called
+  // twice, which is a no-op.)
+  QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+      ->OnDecodingCompleted();
+
+  EXPECT_TRUE(stream_->headers_decompressed());
+
+  EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+  stream_->ConsumeHeaderList();
+
+  std::string data = DataFrame(kDataFramePayload);
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */
+                                         headers.length(), data));
+  EXPECT_EQ(kDataFramePayload, stream_->data());
+
+  // Trailing HEADERS frame with QPACK encoded
+  // single header field "custom-key: custom-value".
+  std::string trailers = HeadersFrame(
+      QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */
+                                         headers.length() + data.length(),
+                                         trailers));
+
+  // Even though entire header block is received, it cannot be decoded.
+  EXPECT_FALSE(stream_->trailers_decompressed());
+
+  // Signal to QpackDecodedHeadersAccumulator that the header block has been
+  // decoded.
+  QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+      ->OnDecodingCompleted();
+  EXPECT_TRUE(stream_->trailers_decompressed());
+
+  // Verify trailers.
+  EXPECT_THAT(stream_->received_trailers(),
+              ElementsAre(Pair("custom-key", "custom-value")));
+  stream_->MarkTrailersConsumed();
+}
+
+TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingHeaders) {
+  if (!VersionUsesQpack(GetParam().transport_version)) {
+    return;
+  }
+
+  Initialize(kShouldProcessData);
+  QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+  // HEADERS frame with QPACK encoded single header field "foo: bar".
+  std::string headers =
+      HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+  // Even though entire header block is received, it cannot be decoded.
+  EXPECT_FALSE(stream_->headers_decompressed());
+
+  // Signal a decoding error to QpackDecodedHeadersAccumulator.
+  std::string expected_error_message =
+      QuicStrCat("Error during async decoding of headers on stream ",
+                 stream_->id(), ": unexpected error");
+  EXPECT_CALL(
+      *connection_,
+      CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message,
+                      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET));
+  QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+      ->OnDecodingErrorDetected("unexpected error");
+}
+
+TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingTrailers) {
+  if (!VersionUsesQpack(GetParam().transport_version)) {
+    return;
+  }
+
+  Initialize(kShouldProcessData);
+  QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+  // HEADERS frame with QPACK encoded single header field "foo: bar".
+  std::string headers =
+      HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+  // Even though entire header block is received, it cannot be decoded.
+  EXPECT_FALSE(stream_->headers_decompressed());
+
+  // Signal to QpackDecodedHeadersAccumulator that the header block has been
+  // decoded.  (OnDecodingCompleted() has already been called by the QPACK
+  // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called
+  // twice, which is a no-op.)
+  QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+      ->OnDecodingCompleted();
+
+  EXPECT_TRUE(stream_->headers_decompressed());
+
+  EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+  stream_->ConsumeHeaderList();
+
+  std::string data = DataFrame(kDataFramePayload);
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */
+                                         headers.length(), data));
+  EXPECT_EQ(kDataFramePayload, stream_->data());
+
+  // Trailing HEADERS frame with QPACK encoded
+  // single header field "custom-key: custom-value".
+  std::string trailers = HeadersFrame(
+      QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+  stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */
+                                         headers.length() + data.length(),
+                                         trailers));
+
+  // Even though entire header block is received, it cannot be decoded.
+  EXPECT_FALSE(stream_->trailers_decompressed());
+
+  // Signal a decoding error to QpackDecodedHeadersAccumulator.
+  std::string expected_error_message =
+      QuicStrCat("Error during async decoding of trailers on stream ",
+                 stream_->id(), ": unexpected error");
+  EXPECT_CALL(
+      *connection_,
+      CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message,
+                      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET));
+  QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+      ->OnDecodingErrorDetected("unexpected error");
+}
+
 class QuicSpdyStreamIncrementalConsumptionTest : public QuicSpdyStreamTest {
  protected:
   QuicSpdyStreamIncrementalConsumptionTest() : offset_(0), consumed_bytes_(0) {}
diff --git a/quic/core/qpack/qpack_decoded_headers_accumulator.cc b/quic/core/qpack/qpack_decoded_headers_accumulator.cc
index 7a3b157..8ca28c9 100644
--- a/quic/core/qpack/qpack_decoded_headers_accumulator.cc
+++ b/quic/core/qpack/qpack_decoded_headers_accumulator.cc
@@ -11,10 +11,15 @@
 QpackDecodedHeadersAccumulator::QpackDecodedHeadersAccumulator(
     QuicStreamId id,
     QpackDecoder* qpack_decoder,
-    size_t max_header_list_size)
+    Visitor* visitor,
+    size_t max_header_list_size,
+    bool pretend_blocked_decoding_for_tests)
     : decoder_(qpack_decoder->CreateProgressiveDecoder(id, this)),
+      visitor_(visitor),
       uncompressed_header_bytes_(0),
       compressed_header_bytes_(0),
+      blocked_(false),
+      pretend_blocked_decoding_for_tests_(pretend_blocked_decoding_for_tests),
       error_detected_(false) {
   quic_header_list_.set_max_header_list_size(max_header_list_size);
   quic_header_list_.OnHeaderBlockStart();
@@ -28,7 +33,14 @@
   quic_header_list_.OnHeader(name, value);
 }
 
-void QpackDecodedHeadersAccumulator::OnDecodingCompleted() {}
+void QpackDecodedHeadersAccumulator::OnDecodingCompleted() {
+  quic_header_list_.OnHeaderBlockEnd(uncompressed_header_bytes_,
+                                     compressed_header_bytes_);
+
+  if (blocked_) {
+    visitor_->OnHeadersDecoded(quic_header_list_);
+  }
+}
 
 void QpackDecodedHeadersAccumulator::OnDecodingErrorDetected(
     QuicStringPiece error_message) {
@@ -37,6 +49,10 @@
   error_detected_ = true;
   // Copy error message to ensure it remains valid for the lifetime of |this|.
   error_message_.assign(error_message.data(), error_message.size());
+
+  if (blocked_) {
+    visitor_->OnHeaderDecodingError();
+  }
 }
 
 bool QpackDecodedHeadersAccumulator::Decode(QuicStringPiece data) {
@@ -48,15 +64,18 @@
   return !error_detected_;
 }
 
-bool QpackDecodedHeadersAccumulator::EndHeaderBlock() {
+QpackDecodedHeadersAccumulator::Status
+QpackDecodedHeadersAccumulator::EndHeaderBlock() {
   DCHECK(!error_detected_);
 
   decoder_->EndHeaderBlock();
 
-  quic_header_list_.OnHeaderBlockEnd(uncompressed_header_bytes_,
-                                     compressed_header_bytes_);
+  if (pretend_blocked_decoding_for_tests_) {
+    blocked_ = true;
+    return Status::kBlocked;
+  }
 
-  return !error_detected_;
+  return error_detected_ ? Status::kError : Status::kSuccess;
 }
 
 const QuicHeaderList& QpackDecodedHeadersAccumulator::quic_header_list() const {
diff --git a/quic/core/qpack/qpack_decoded_headers_accumulator.h b/quic/core/qpack/qpack_decoded_headers_accumulator.h
index 5db88d7..59480db 100644
--- a/quic/core/qpack/qpack_decoded_headers_accumulator.h
+++ b/quic/core/qpack/qpack_decoded_headers_accumulator.h
@@ -24,9 +24,35 @@
 class QUIC_EXPORT_PRIVATE QpackDecodedHeadersAccumulator
     : public QpackProgressiveDecoder::HeadersHandlerInterface {
  public:
+  // Return value for EndHeaderBlock().
+  enum class Status {
+    // Headers have been successfully decoded.
+    kSuccess,
+    // An error has occurred.
+    kError,
+    // Decoding is blocked.
+    kBlocked
+  };
+
+  // Visitor interface used for blocked decoding.  Exactly one visitor method
+  // will be called if EndHeaderBlock() returned kBlocked.  No visitor method
+  // will be called if EndHeaderBlock() returned any other value.
+  class Visitor {
+   public:
+    virtual ~Visitor() = default;
+
+    // Called when headers are successfully decoded.
+    virtual void OnHeadersDecoded(QuicHeaderList headers) = 0;
+
+    // Called when an error has occurred.
+    virtual void OnHeaderDecodingError() = 0;
+  };
+
   QpackDecodedHeadersAccumulator(QuicStreamId id,
                                  QpackDecoder* qpack_decoder,
-                                 size_t max_header_list_size);
+                                 Visitor* visitor,
+                                 size_t max_header_list_size,
+                                 bool pretend_blocked_decoding_for_tests);
   virtual ~QpackDecodedHeadersAccumulator() = default;
 
   // QpackProgressiveDecoder::HeadersHandlerInterface implementation.
@@ -40,23 +66,36 @@
   // Must not be called after EndHeaderBlock().
   bool Decode(QuicStringPiece data);
 
-  // Signal end of HEADERS frame.  Returns true on success, false on error.
+  // Signal end of HEADERS frame.
   // Must not be called if an error has been detected.
   // Must not be called more that once.
-  bool EndHeaderBlock();
+  // Returns kSuccess if headers can be readily decoded.
+  // Returns kError if an error occurred.
+  // Returns kBlocked if headers cannot be decoded at the moment, in which case
+  // exactly one Visitor method will be called as soon as sufficient data
+  // is received on the QPACK decoder stream.
+  Status EndHeaderBlock();
 
   // Returns accumulated header list.
   const QuicHeaderList& quic_header_list() const;
 
   // Returns error message.
   // Must not be called unless an error has been detected.
+  // TODO(b/124216424): Add accessor for error code, return HTTP_EXCESSIVE_LOAD
+  // or HTTP_QPACK_DECOMPRESSION_FAILED.
   QuicStringPiece error_message() const;
 
  private:
   std::unique_ptr<QpackProgressiveDecoder> decoder_;
+  Visitor* visitor_;
   QuicHeaderList quic_header_list_;
   size_t uncompressed_header_bytes_;
   size_t compressed_header_bytes_;
+  // Set to true when EndHeaderBlock() returns kBlocked.
+  bool blocked_;
+  // TODO(b/112770235): Remove once blocked decoding is implemented
+  // and can be tested with delayed encoder stream data.
+  bool pretend_blocked_decoding_for_tests_;
   bool error_detected_;
   std::string error_message_;
 };
diff --git a/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc b/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
index 2ba9c4f..6f424e4 100644
--- a/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
+++ b/quic/core/qpack/qpack_decoded_headers_accumulator_test.cc
@@ -16,6 +16,7 @@
 using ::testing::Eq;
 using ::testing::Pair;
 using ::testing::StrictMock;
+using Status = quic::QpackDecodedHeadersAccumulator::Status;
 
 namespace quic {
 namespace test {
@@ -32,29 +33,41 @@
 
 }  // anonymous namespace
 
+class NoopVisitor : public QpackDecodedHeadersAccumulator::Visitor {
+ public:
+  ~NoopVisitor() override = default;
+  void OnHeadersDecoded(QuicHeaderList /* headers */) override {}
+  void OnHeaderDecodingError() override {}
+};
+
 class QpackDecodedHeadersAccumulatorTest : public QuicTest {
  protected:
   QpackDecodedHeadersAccumulatorTest()
       : qpack_decoder_(&encoder_stream_error_delegate_,
                        &decoder_stream_sender_delegate_),
-        accumulator_(kTestStreamId, &qpack_decoder_, kMaxHeaderListSize) {}
+        accumulator_(kTestStreamId,
+                     &qpack_decoder_,
+                     &visitor_,
+                     kMaxHeaderListSize,
+                     false) {}
 
   NoopEncoderStreamErrorDelegate encoder_stream_error_delegate_;
   StrictMock<MockQpackStreamSenderDelegate> decoder_stream_sender_delegate_;
   QpackDecoder qpack_decoder_;
+  NoopVisitor visitor_;
   QpackDecodedHeadersAccumulator accumulator_;
 };
 
 // HEADERS frame payload must have a complete Header Block Prefix.
 TEST_F(QpackDecodedHeadersAccumulatorTest, EmptyPayload) {
-  EXPECT_FALSE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kError, accumulator_.EndHeaderBlock());
   EXPECT_EQ("Incomplete header data prefix.", accumulator_.error_message());
 }
 
 // HEADERS frame payload must have a complete Header Block Prefix.
 TEST_F(QpackDecodedHeadersAccumulatorTest, TruncatedHeaderBlockPrefix) {
   EXPECT_TRUE(accumulator_.Decode(QuicTextUtils::HexDecode("00")));
-  EXPECT_FALSE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kError, accumulator_.EndHeaderBlock());
   EXPECT_EQ("Incomplete header data prefix.", accumulator_.error_message());
 }
 
@@ -63,7 +76,7 @@
               WriteStreamData(Eq(kHeaderAcknowledgement)));
 
   EXPECT_TRUE(accumulator_.Decode(QuicTextUtils::HexDecode("0000")));
-  EXPECT_TRUE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kSuccess, accumulator_.EndHeaderBlock());
 
   EXPECT_TRUE(accumulator_.quic_header_list().empty());
 }
@@ -72,7 +85,7 @@
 // before it can be completely decoded.
 TEST_F(QpackDecodedHeadersAccumulatorTest, TruncatedPayload) {
   EXPECT_TRUE(accumulator_.Decode(QuicTextUtils::HexDecode("00002366")));
-  EXPECT_FALSE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kError, accumulator_.EndHeaderBlock());
   EXPECT_EQ("Incomplete header block.", accumulator_.error_message());
 }
 
@@ -88,7 +101,7 @@
 
   std::string encoded_data(QuicTextUtils::HexDecode("000023666f6f03626172"));
   EXPECT_TRUE(accumulator_.Decode(encoded_data));
-  EXPECT_TRUE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kSuccess, accumulator_.EndHeaderBlock());
 
   const QuicHeaderList& header_list = accumulator_.quic_header_list();
   EXPECT_THAT(header_list, ElementsAre(Pair("foo", "bar")));
@@ -110,11 +123,25 @@
       "616161616161616161616161616161616161616161616161616161616161616161616161"
       "616161616161616161616161616161616161616161616161616161616161616161616161"
       "61616161616161616161616161616161616161616161616161616161616161616161")));
-  EXPECT_TRUE(accumulator_.EndHeaderBlock());
+  EXPECT_EQ(Status::kSuccess, accumulator_.EndHeaderBlock());
 
   // QuicHeaderList signals header list over limit by clearing it.
   EXPECT_TRUE(accumulator_.quic_header_list().empty());
 }
 
+// TODO(b/112770235): Remove once blocked decoding is implemented
+// and can be tested with delayed encoder stream data.
+TEST_F(QpackDecodedHeadersAccumulatorTest, PretendBlockedEncoding) {
+  QpackDecodedHeadersAccumulator accumulator(
+      kTestStreamId, &qpack_decoder_, &visitor_, kMaxHeaderListSize,
+      /* pretend_blocked_decoding_for_tests = */ true);
+
+  EXPECT_CALL(decoder_stream_sender_delegate_,
+              WriteStreamData(Eq(kHeaderAcknowledgement)));
+
+  EXPECT_TRUE(accumulator.Decode(QuicTextUtils::HexDecode("0000")));
+  EXPECT_EQ(Status::kBlocked, accumulator.EndHeaderBlock());
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_spdy_stream_peer.cc b/quic/test_tools/quic_spdy_stream_peer.cc
index 4d23434..b02c145 100644
--- a/quic/test_tools/quic_spdy_stream_peer.cc
+++ b/quic/test_tools/quic_spdy_stream_peer.cc
@@ -5,6 +5,7 @@
 #include "net/third_party/quiche/src/quic/test_tools/quic_spdy_stream_peer.h"
 
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream.h"
+#include "net/third_party/quiche/src/quic/test_tools/quic_test_utils.h"
 
 namespace quic {
 namespace test {
@@ -22,5 +23,16 @@
   return stream->unacked_frame_headers_offsets_;
 }
 
+// static
+void QuicSpdyStreamPeer::pretend_blocked_decoding(QuicSpdyStream* stream) {
+  stream->pretend_blocked_decoding_for_tests_ = true;
+}
+
+// static
+QpackDecodedHeadersAccumulator*
+QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(QuicSpdyStream* stream) {
+  return stream->qpack_decoded_headers_accumulator_.get();
+}
+
 }  // namespace test
 }  // namespace quic
diff --git a/quic/test_tools/quic_spdy_stream_peer.h b/quic/test_tools/quic_spdy_stream_peer.h
index 7b3fe7c..bd93f37 100644
--- a/quic/test_tools/quic_spdy_stream_peer.h
+++ b/quic/test_tools/quic_spdy_stream_peer.h
@@ -8,9 +8,11 @@
 #include "net/third_party/quiche/src/quic/core/quic_ack_listener_interface.h"
 #include "net/third_party/quiche/src/quic/core/quic_interval_set.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
 
 namespace quic {
 
+class QpackDecodedHeadersAccumulator;
 class QuicSpdyStream;
 
 namespace test {
@@ -22,6 +24,12 @@
       QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener);
   static const QuicIntervalSet<QuicStreamOffset>& unacked_frame_headers_offsets(
       QuicSpdyStream* stream);
+
+  // TODO(b/112770235): Remove once blocked decoding is implemented
+  // and can be tested with delayed encoder stream data.
+  static void pretend_blocked_decoding(QuicSpdyStream* stream);
+  static QpackDecodedHeadersAccumulator* qpack_decoded_headers_accumulator(
+      QuicSpdyStream* stream);
 };
 
 }  // namespace test