In HTTP/3, wait until bodies are consumed before consuming trailers.

QuicStreamSequencer can only be consumed sequentially, so we need to preserve
the ordering of consumption the same as how the frames arrive.

gfe-relnote: verion 99 only. Not in prod.
PiperOrigin-RevId: 250924667
Change-Id: Iff9c95b91d88e08a11de4a07f9aa75467d907bd2
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index 4796c78..bd15918 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -152,6 +152,7 @@
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
+      trailers_consumed_in_sequencer_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       ack_listener_(nullptr) {
@@ -182,6 +183,7 @@
       trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
+      trailers_consumed_in_sequencer_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
       body_buffer_(sequencer()),
       ack_listener_(nullptr) {
@@ -362,7 +364,11 @@
           spdy_session_->connection()->transport_version())) {
     return sequencer()->Readv(iov, iov_len);
   }
-  return body_buffer_.ReadBody(iov, iov_len);
+  size_t bytes_read = body_buffer_.ReadBody(iov, iov_len);
+  if (trailers_consumed_) {
+    MarkTrailersConsumed();
+  }
+  return bytes_read;
 }
 
 int QuicSpdyStream::GetReadableRegions(iovec* iov, size_t iov_len) const {
@@ -382,6 +388,9 @@
     return;
   }
   body_buffer_.MarkBodyConsumed(num_bytes);
+  if (trailers_consumed_) {
+    MarkTrailersConsumed();
+  }
 }
 
 bool QuicSpdyStream::IsDoneReading() const {
@@ -401,11 +410,13 @@
 
 void QuicSpdyStream::MarkTrailersConsumed() {
   if (VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
-      !reading_stopped()) {
+      !reading_stopped() && !body_buffer_.HasBytesToRead() &&
+      !trailers_consumed_in_sequencer_) {
     const QuicByteCount trailers_total_length =
         trailers_length_.header_length + trailers_length_.payload_length;
     if (trailers_total_length > 0) {
       sequencer()->MarkConsumed(trailers_total_length);
+      trailers_consumed_in_sequencer_ = true;
     }
   }
 
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 8136d20..2182d28 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -147,7 +147,8 @@
 
   // Marks the trailers as consumed. This applies to the case where this object
   // receives headers and trailers as QuicHeaderLists via calls to
-  // OnStreamHeaderList().
+  // OnStreamHeaderList(). Trailer data will be consumed from the sequencer only
+  // once all body data has been consumed.
   void MarkTrailersConsumed();
 
   // Clears |header_list_|.
@@ -268,6 +269,8 @@
   bool trailers_decompressed_;
   // True if the trailers have been consumed.
   bool trailers_consumed_;
+  // True if the trailers have actually been consumed in the stream sequencer.
+  bool trailers_consumed_in_sequencer_;
   // The parsed trailers received from the peer.
   spdy::SpdyHeaderBlock received_trailers_;
 
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index b6e59a5..7c423b4 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -148,6 +148,18 @@
         "JBCScs_ejbKaqBDoB7ZGxTvqlrB__2ZmnHHjCr8RgMRtKNtIeuZAo ";
   }
 
+  std::string EncodeQpackHeaders(QuicStreamId id, SpdyHeaderBlock* header) {
+    auto qpack_encoder =
+        QuicMakeUnique<QpackEncoder>(session_.get(), session_.get());
+    auto progreesive_encoder = qpack_encoder->EncodeHeaderList(id, header);
+    std::string encoded_headers;
+    while (progreesive_encoder->HasNext()) {
+      progreesive_encoder->Next(std::numeric_limits<size_t>::max(),
+                                &encoded_headers);
+    }
+    return encoded_headers;
+  }
+
   void Initialize(bool stream_should_process_data) {
     connection_ = new StrictMock<MockQuicConnection>(
         &helper_, &alarm_factory_, Perspective::IS_SERVER,
@@ -1796,6 +1808,61 @@
                             testing::Pair("custom-key", "custom-value")));
 }
 
+TEST_P(QuicSpdyStreamTest, ProcessBodyAfterTrailers) {
+  if (!VersionUsesQpack(GetParam().transport_version)) {
+    return;
+  }
+  Initialize(false);
+
+  // QPACK encoded header block with single header field "foo: bar".
+  std::string headers_frame_payload =
+      QuicTextUtils::HexDecode("00002a94e703626172");
+  std::unique_ptr<char[]> headers_buffer;
+  QuicByteCount headers_frame_header_length =
+      encoder_.SerializeHeadersFrameHeader(headers_frame_payload.length(),
+                                           &headers_buffer);
+  QuicStringPiece headers_frame_header(headers_buffer.get(),
+                                       headers_frame_header_length);
+
+  std::string data_frame_payload = "some data";
+  std::unique_ptr<char[]> data_buffer;
+  QuicByteCount data_frame_header_length = encoder_.SerializeDataFrameHeader(
+      data_frame_payload.length(), &data_buffer);
+  QuicStringPiece data_frame_header(data_buffer.get(),
+                                    data_frame_header_length);
+
+  // A header block that will take more than one block of sequencer buffer.
+  // This ensures that when the trailers are consumed, some buffer buckets will
+  // be freed.
+  SpdyHeaderBlock trailers_block;
+  trailers_block["key1"] = std::string(10000, 'x');
+  std::string trailers_frame_payload =
+      EncodeQpackHeaders(stream_->id(), &trailers_block);
+
+  std::unique_ptr<char[]> trailers_buffer;
+  QuicByteCount trailers_frame_header_length =
+      encoder_.SerializeHeadersFrameHeader(trailers_frame_payload.length(),
+                                           &trailers_buffer);
+  QuicStringPiece trailers_frame_header(trailers_buffer.get(),
+                                        trailers_frame_header_length);
+
+  std::string stream_frame_payload = QuicStrCat(
+      headers_frame_header, headers_frame_payload, data_frame_header,
+      data_frame_payload, trailers_frame_header, trailers_frame_payload);
+  QuicStreamFrame frame(stream_->id(), false, 0, stream_frame_payload);
+  stream_->OnStreamFrame(frame);
+
+  stream_->ConsumeHeaderList();
+  stream_->MarkTrailersConsumed();
+  char buffer[2048];
+  struct iovec vec;
+  vec.iov_base = buffer;
+  vec.iov_len = QUIC_ARRAYSIZE(buffer);
+  size_t bytes_read = stream_->Readv(&vec, 1);
+  std::string data(buffer, bytes_read);
+  EXPECT_EQ("some data", data);
+}
+
 }  // namespace
 }  // namespace test
 }  // namespace quic