Support blocked decoding in qpack_round_trip_fuzzer.cc.

gfe-relnote: n/a, test-only change.
PiperOrigin-RevId: 263656501
Change-Id: I608df789a1c2e9df680051982bb11c47e4d8a51b
diff --git a/quic/core/qpack/fuzzer/qpack_round_trip_fuzzer.cc b/quic/core/qpack/fuzzer/qpack_round_trip_fuzzer.cc
index 84c6948..164b05d 100644
--- a/quic/core/qpack/fuzzer/qpack_round_trip_fuzzer.cc
+++ b/quic/core/qpack/fuzzer/qpack_round_trip_fuzzer.cc
@@ -79,6 +79,11 @@
    public:
     virtual ~Visitor() = default;
 
+    // If decoding of the previous header block is still in progress, then
+    // DelayedHeaderBlockTransmitter will not start transmitting the next header
+    // block.
+    virtual bool IsDecodingInProgressOnStream(QuicStreamId stream_id) = 0;
+
     // Called when a header block starts.
     virtual void OnHeaderBlockStart(QuicStreamId stream_id) = 0;
     // Called when part or all of a header block is transmitted.
@@ -117,6 +122,12 @@
     std::advance(it, index);
     const QuicStreamId stream_id = it->first;
 
+    // Do not start new header block if processing of previous header block is
+    // blocked.
+    if (visitor_->IsDecodingInProgressOnStream(stream_id)) {
+      return;
+    }
+
     auto& header_block_queue = it->second;
     visitor_->OnHeaderBlockStart(stream_id);
     visitor_->OnHeaderBlockFragment(stream_id, header_block_queue.front());
@@ -130,11 +141,12 @@
 
   // Release all header block data.  Must be called before destruction.  All
   // encoder stream data must have been released before calling Flush() so that
-  // all remaining header blocks can be decoded synchronously.
+  // all header blocks can be decoded synchronously.
   void Flush() {
     while (!header_blocks_.empty()) {
       auto it = header_blocks_.begin();
       const QuicStreamId stream_id = it->first;
+      CHECK(!visitor_->IsDecodingInProgressOnStream(stream_id));
 
       auto& header_block_queue = it->second;
       visitor_->OnHeaderBlockStart(stream_id);
@@ -160,10 +172,22 @@
 // keep necessary decoding context while waiting for decoding to complete.
 class VerifyingDecoder : public QpackDecodedHeadersAccumulator::Visitor {
  public:
+  class Visitor {
+   public:
+    virtual ~Visitor() = default;
+
+    // Called when header block is decoded, either synchronously or
+    // asynchronously.  Might destroy VerifyingDecoder.
+    virtual void OnHeaderBlockDecoded(QuicStreamId stream_id) = 0;
+  };
+
   VerifyingDecoder(QuicStreamId stream_id,
+                   Visitor* visitor,
                    QpackDecoder* qpack_decoder,
                    QuicHeaderList expected_header_list)
-      : accumulator_(
+      : stream_id_(stream_id),
+        visitor_(visitor),
+        accumulator_(
             stream_id,
             qpack_decoder,
             this,
@@ -180,7 +204,13 @@
   virtual ~VerifyingDecoder() = default;
 
   // QpackDecodedHeadersAccumulator::Visitor implementation.
-  void OnHeadersDecoded(QuicHeaderList /*headers*/) override {}
+  void OnHeadersDecoded(QuicHeaderList headers) override {
+    // Verify headers.
+    CHECK(expected_header_list_ == headers);
+
+    // Might destroy |this|.
+    visitor_->OnHeaderBlockDecoded(stream_id_);
+  }
 
   void OnHeaderDecodingError() override {
     CHECK(false) << accumulator_.error_message();
@@ -198,13 +228,21 @@
     CHECK(status != QpackDecodedHeadersAccumulator::Status::kError)
         << accumulator_.error_message();
 
+    if (status == QpackDecodedHeadersAccumulator::Status::kBlocked) {
+      return;
+    }
+
     CHECK(status == QpackDecodedHeadersAccumulator::Status::kSuccess);
 
     // Compare resulting header list to original.
     CHECK(expected_header_list_ == accumulator_.quic_header_list());
+    // Might destroy |this|.
+    visitor_->OnHeaderBlockDecoded(stream_id_);
   }
 
  private:
+  QuicStreamId stream_id_;
+  Visitor* const visitor_;
   QpackDecodedHeadersAccumulator accumulator_;
   QuicHeaderList expected_header_list_;
 };
@@ -212,7 +250,8 @@
 // Class that holds QpackDecoder and its EncoderStreamErrorDelegate, and creates
 // and keeps VerifyingDecoders for each received header block until decoding is
 // complete.
-class DecodingEndpoint : public DelayedHeaderBlockTransmitter::Visitor {
+class DecodingEndpoint : public DelayedHeaderBlockTransmitter::Visitor,
+                         public VerifyingDecoder::Visitor {
  public:
   DecodingEndpoint(uint64_t maximum_dynamic_table_capacity,
                    uint64_t maximum_blocked_streams)
@@ -244,8 +283,19 @@
     it->second.push(std::move(expected_header_list));
   }
 
+  // VerifyingDecoder::Visitor implementation.
+  void OnHeaderBlockDecoded(QuicStreamId stream_id) override {
+    auto result = verifying_decoders_.erase(stream_id);
+    CHECK_EQ(1u, result);
+  }
+
   // DelayedHeaderBlockTransmitter::Visitor implementation.
+  bool IsDecodingInProgressOnStream(QuicStreamId stream_id) override {
+    return verifying_decoders_.find(stream_id) != verifying_decoders_.end();
+  }
+
   void OnHeaderBlockStart(QuicStreamId stream_id) override {
+    CHECK(!IsDecodingInProgressOnStream(stream_id));
     auto it = expected_header_lists_.find(stream_id);
     CHECK(it != expected_header_lists_.end());
 
@@ -258,7 +308,7 @@
     }
 
     auto verifying_decoder = QuicMakeUnique<VerifyingDecoder>(
-        stream_id, &decoder_, std::move(expected_header_list));
+        stream_id, this, &decoder_, std::move(expected_header_list));
     auto result =
         verifying_decoders_.insert({stream_id, std::move(verifying_decoder)});
     CHECK(result.second);
@@ -275,8 +325,6 @@
     auto it = verifying_decoders_.find(stream_id);
     CHECK(it != verifying_decoders_.end());
     it->second->EndHeaderBlock();
-    auto result = verifying_decoders_.erase(stream_id);
-    CHECK_EQ(1u, result);
   }
 
  private: