Cloned from CL 231494986 by 'g4 patch'.
Original change by bnc@bnc:majom-bnc-chromium-google3-headers-git5:2181:citc on 2019/01/29 16:21:02.

Send HEADERS on the request stream.

In QUIC version 99, send HEADERS (including trailers) on the request stream
instead of the headers stream, compressed with QPACK instead of HPACK.

gfe-relnote: Send HEADERS on the request stream in QUIC version 99 only.  Not flag protected.
PiperOrigin-RevId: 249121660
Change-Id: I933f9433da8bfffc8c2979aea742d485639b33c5
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index d04efc5..0127c5b 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -4,12 +4,17 @@
 
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream.h"
 
+#include <limits>
 #include <string>
 #include <utility>
 
 #include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h"
 #include "net/third_party/quiche/src/quic/core/http/spdy_utils.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
 #include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/core/quic_versions.h"
 #include "net/third_party/quiche/src/quic/core/quic_write_blocked_list.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
 #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
@@ -139,8 +144,11 @@
                                StreamType type)
     : QuicStream(id, spdy_session, /*is_static=*/false, type),
       spdy_session_(spdy_session),
+      on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
       headers_decompressed_(false),
+      headers_length_(0, 0),
+      trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
@@ -148,9 +156,11 @@
       ack_listener_(nullptr) {
   DCHECK(!QuicUtils::IsCryptoStreamId(
       spdy_session->connection()->transport_version(), id));
-  // Don't receive any callbacks from the sequencer until headers
-  // are complete.
-  sequencer()->SetBlockedUntilFlush();
+  // If headers are sent on the headers stream, then do not receive any
+  // callbacks from the sequencer until headers are complete.
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    sequencer()->SetBlockedUntilFlush();
+  }
 
   if (VersionHasDataFrameHeader(
           spdy_session_->connection()->transport_version())) {
@@ -164,8 +174,11 @@
                                StreamType type)
     : QuicStream(std::move(pending), type, /*is_static=*/false),
       spdy_session_(spdy_session),
+      on_body_available_called_because_sequencer_is_closed_(false),
       visitor_(nullptr),
       headers_decompressed_(false),
+      headers_length_(0, 0),
+      trailers_length_(0, 0),
       trailers_decompressed_(false),
       trailers_consumed_(false),
       http_decoder_visitor_(new HttpDecoderVisitor(this)),
@@ -173,9 +186,11 @@
       ack_listener_(nullptr) {
   DCHECK(!QuicUtils::IsCryptoStreamId(
       spdy_session->connection()->transport_version(), id()));
-  // Don't receive any callbacks from the sequencer until headers
-  // are complete.
-  sequencer()->SetBlockedUntilFlush();
+  // If headers are sent on the headers stream, then do not receive any
+  // callbacks from the sequencer until headers are complete.
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    sequencer()->SetBlockedUntilFlush();
+  }
 
   if (VersionHasDataFrameHeader(
           spdy_session_->connection()->transport_version())) {
@@ -192,7 +207,11 @@
     QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
   size_t bytes_written =
       WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
-  if (fin) {
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+      fin) {
+    // If HEADERS are sent on the headers stream, then |fin_sent_| needs to be
+    // set and write side needs to be closed without actually sending a FIN on
+    // this stream.
     // TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent.
     set_fin_sent(true);
     CloseWriteSide();
@@ -217,13 +236,15 @@
   unacked_frame_headers_offsets_.Add(
       send_buffer().stream_offset(),
       send_buffer().stream_offset() + header_length);
-  QUIC_DLOG(INFO) << "Stream " << id() << " is writing header of length "
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing DATA frame header of length "
                   << header_length;
   WriteOrBufferData(QuicStringPiece(buffer.get(), header_length), false,
                     nullptr);
 
   // Write body.
-  QUIC_DLOG(INFO) << "Stream " << id() << " is writing body of length "
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing DATA frame payload of length "
                   << data.length();
   WriteOrBufferData(data, fin, nullptr);
 }
@@ -236,26 +257,33 @@
     return 0;
   }
 
-  // The header block must contain the final offset for this stream, as the
-  // trailers may be processed out of order at the peer.
-  QUIC_DLOG(INFO) << "Inserting trailer: (" << kFinalOffsetHeaderKey << ", "
-                  << stream_bytes_written() + BufferedDataBytes() << ")";
-  trailer_block.insert(
-      std::make_pair(kFinalOffsetHeaderKey,
-                     QuicTextUtils::Uint64ToString(stream_bytes_written() +
-                                                   BufferedDataBytes())));
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    // The header block must contain the final offset for this stream, as the
+    // trailers may be processed out of order at the peer.
+    const QuicStreamOffset final_offset =
+        stream_bytes_written() + BufferedDataBytes();
+    QUIC_DLOG(INFO) << "Inserting trailer: (" << kFinalOffsetHeaderKey << ", "
+                    << final_offset << ")";
+    trailer_block.insert(std::make_pair(
+        kFinalOffsetHeaderKey, QuicTextUtils::Uint64ToString(final_offset)));
+  }
 
   // Write the trailing headers with a FIN, and close stream for writing:
   // trailers are the last thing to be sent on a stream.
   const bool kFin = true;
   size_t bytes_written =
       WriteHeadersImpl(std::move(trailer_block), kFin, std::move(ack_listener));
-  set_fin_sent(kFin);
 
-  // Trailers are the last thing to be sent on a stream, but if there is still
-  // queued data then CloseWriteSide() will cause it never to be sent.
-  if (BufferedDataBytes() == 0) {
-    CloseWriteSide();
+  // If trailers are sent on the headers stream, then |fin_sent_| needs to be
+  // set without actually sending a FIN on this stream.
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    set_fin_sent(kFin);
+
+    // Also, write side of this stream needs to be closed.  However, only do
+    // this if there is no more buffered data, otherwise it will never be sent.
+    if (BufferedDataBytes() == 0) {
+      CloseWriteSide();
+    }
   }
 
   return bytes_written;
@@ -298,12 +326,14 @@
   unacked_frame_headers_offsets_.Add(
       send_buffer().stream_offset(),
       send_buffer().stream_offset() + header_length);
-  QUIC_DLOG(INFO) << "Stream " << id() << " is writing header of length "
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing DATA frame header of length "
                   << header_length;
   WriteMemSlices(storage.ToSpan(), false);
 
   // Write body.
-  QUIC_DLOG(INFO) << "Stream " << id() << " is writing body of length "
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing DATA frame payload of length "
                   << slices.total_length();
   return WriteMemSlices(slices, fin);
 }
@@ -352,6 +382,15 @@
 }
 
 void QuicSpdyStream::MarkTrailersConsumed() {
+  if (VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+      !reading_stopped()) {
+    const QuicByteCount trailers_total_length =
+        trailers_length_.header_length + trailers_length_.payload_length;
+    if (trailers_total_length > 0) {
+      sequencer()->MarkConsumed(trailers_total_length);
+    }
+  }
+
   trailers_consumed_ = true;
 }
 
@@ -365,8 +404,35 @@
 
 void QuicSpdyStream::ConsumeHeaderList() {
   header_list_.Clear();
-  if (FinishedReadingHeaders()) {
-    sequencer()->SetUnblocked();
+
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    if (FinishedReadingHeaders()) {
+      sequencer()->SetUnblocked();
+    }
+    return;
+  }
+
+  if (!reading_stopped()) {
+    const QuicByteCount headers_total_length =
+        headers_length_.header_length + headers_length_.payload_length;
+    if (headers_total_length > 0) {
+      sequencer()->MarkConsumed(headers_total_length);
+    }
+  }
+
+  if (!FinishedReadingHeaders()) {
+    return;
+  }
+
+  if (body_buffer_.HasBytesToRead()) {
+    OnBodyAvailable();
+    return;
+  }
+
+  if (sequencer()->IsClosed() &&
+      !on_body_available_called_because_sequencer_is_closed_) {
+    on_body_available_called_because_sequencer_is_closed_ = true;
+    OnBodyAvailable();
   }
 }
 
@@ -398,7 +464,15 @@
 }
 
 void QuicSpdyStream::OnHeadersTooLarge() {
-  Reset(QUIC_HEADERS_TOO_LARGE);
+  if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    // TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code.
+    std::string error_message =
+        QuicStrCat("Too large headers received on stream ", id());
+    CloseConnectionWithDetails(QUIC_HEADERS_STREAM_DATA_DECOMPRESS_FAILURE,
+                               error_message);
+  } else {
+    Reset(QUIC_HEADERS_TOO_LARGE);
+  }
 }
 
 void QuicSpdyStream::OnInitialHeadersComplete(
@@ -407,8 +481,20 @@
     const QuicHeaderList& header_list) {
   headers_decompressed_ = true;
   header_list_ = header_list;
+
+  if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    if (fin) {
+      OnStreamFrame(
+          QuicStreamFrame(id(), /* fin = */ true,
+                          flow_controller()->highest_received_byte_offset(),
+                          QuicStringPiece()));
+    }
+    return;
+  }
+
   if (fin) {
-    OnStreamFrame(QuicStreamFrame(id(), fin, 0, QuicStringPiece()));
+    OnStreamFrame(
+        QuicStreamFrame(id(), fin, /* offset = */ 0, QuicStringPiece()));
   }
   if (FinishedReadingHeaders()) {
     sequencer()->SetUnblocked();
@@ -431,15 +517,20 @@
     size_t /*frame_len*/,
     const QuicHeaderList& header_list) {
   DCHECK(!trailers_decompressed_);
-  if (fin_received()) {
-    QUIC_DLOG(ERROR) << "Received Trailers after FIN, on stream: " << id();
+  if ((VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+       sequencer()->IsClosed()) ||
+      (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+       fin_received())) {
+    QUIC_DLOG(INFO) << "Received Trailers after FIN, on stream: " << id();
     session()->connection()->CloseConnection(
         QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin",
         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
     return;
   }
-  if (!fin) {
-    QUIC_DLOG(ERROR) << "Trailers must have FIN set, on stream: " << id();
+
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+      !fin) {
+    QUIC_DLOG(INFO) << "Trailers must have FIN set, on stream: " << id();
     session()->connection()->CloseConnection(
         QUIC_INVALID_HEADERS_STREAM_DATA, "Fin missing from trailers",
         ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
@@ -447,8 +538,9 @@
   }
 
   size_t final_byte_offset = 0;
-  if (!SpdyUtils::CopyAndValidateTrailers(header_list,
-                                          /* expect_final_byte_offset = */ true,
+  const bool expect_final_byte_offset =
+      !VersionUsesQpack(spdy_session_->connection()->transport_version());
+  if (!SpdyUtils::CopyAndValidateTrailers(header_list, expect_final_byte_offset,
                                           &final_byte_offset,
                                           &received_trailers_)) {
     QUIC_DLOG(ERROR) << "Trailers for stream " << id() << " are malformed.";
@@ -458,16 +550,12 @@
     return;
   }
   trailers_decompressed_ = true;
+  const QuicStreamOffset offset =
+      VersionUsesQpack(spdy_session_->connection()->transport_version())
+          ? flow_controller()->highest_received_byte_offset()
+          : final_byte_offset;
   OnStreamFrame(
-      QuicStreamFrame(id(), fin, final_byte_offset, QuicStringPiece()));
-}
-
-size_t QuicSpdyStream::WriteHeadersImpl(
-    spdy::SpdyHeaderBlock header_block,
-    bool fin,
-    QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
-  return spdy_session_->WriteHeadersOnHeadersStream(
-      id(), std::move(header_block), fin, priority(), std::move(ack_listener));
+      QuicStreamFrame(id(), /* fin = */ true, offset, QuicStringPiece()));
 }
 
 void QuicSpdyStream::OnPriorityFrame(SpdyPriority priority) {
@@ -488,7 +576,10 @@
 }
 
 void QuicSpdyStream::OnDataAvailable() {
-  DCHECK(FinishedReadingHeaders());
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    // Sequencer must be blocked until headers are consumed.
+    DCHECK(FinishedReadingHeaders());
+  }
 
   if (!VersionHasDataFrameHeader(
           session()->connection()->transport_version())) {
@@ -502,14 +593,20 @@
                           iov.iov_len);
   }
 
+  // Do not call OnBodyAvailable() until headers are consumed.
+  if (!FinishedReadingHeaders()) {
+    return;
+  }
+
   if (body_buffer_.HasBytesToRead()) {
     OnBodyAvailable();
     return;
   }
 
-  if (sequencer()->IsClosed()) {
+  if (sequencer()->IsClosed() &&
+      !on_body_available_called_because_sequencer_is_closed_) {
+    on_body_available_called_because_sequencer_is_closed_ = true;
     OnBodyAvailable();
-    return;
   }
 }
 
@@ -645,14 +742,96 @@
 
 void QuicSpdyStream::OnHeadersFrameStart(Http3FrameLengths frame_length) {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+  DCHECK(!qpack_decoded_headers_accumulator_);
+
+  if (headers_decompressed_) {
+    trailers_length_ = frame_length;
+  } else {
+    headers_length_ = frame_length;
+  }
+
+  qpack_decoded_headers_accumulator_ =
+      QuicMakeUnique<QpackDecodedHeadersAccumulator>(
+          id(), spdy_session_->qpack_decoder(),
+          spdy_session_->max_inbound_header_list_size());
 }
 
 void QuicSpdyStream::OnHeadersFramePayload(QuicStringPiece payload) {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+  if (!qpack_decoded_headers_accumulator_->Decode(payload)) {
+    // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
+    std::string error_message =
+        QuicStrCat("Error decompressing header block on stream ", id(), ": ",
+                   qpack_decoded_headers_accumulator_->error_message());
+    CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+    return;
+  }
 }
 
 void QuicSpdyStream::OnHeadersFrameEnd() {
   DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+  if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) {
+    // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
+    std::string error_message =
+        QuicStrCat("Error decompressing header block on stream ", id(), ": ",
+                   qpack_decoded_headers_accumulator_->error_message());
+    CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+    return;
+  }
+
+  const QuicByteCount frame_length = headers_decompressed_
+                                         ? trailers_length_.payload_length
+                                         : headers_length_.payload_length;
+  OnStreamHeaderList(/* fin = */ false, frame_length,
+                     qpack_decoded_headers_accumulator_->quic_header_list());
+
+  qpack_decoded_headers_accumulator_.reset();
+}
+
+size_t QuicSpdyStream::WriteHeadersImpl(
+    spdy::SpdyHeaderBlock header_block,
+    bool fin,
+    QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+  if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+    return spdy_session_->WriteHeadersOnHeadersStream(
+        id(), std::move(header_block), fin, priority(),
+        std::move(ack_listener));
+  }
+
+  // Encode header list.
+  auto progressive_encoder = spdy_session_->qpack_encoder()->EncodeHeaderList(
+      /* stream_id = */ id(), &header_block);
+  std::string encoded_headers;
+  while (progressive_encoder->HasNext()) {
+    progressive_encoder->Next(
+        /* max_encoded_bytes = */ std::numeric_limits<size_t>::max(),
+        &encoded_headers);
+  }
+
+  // Write HEADERS frame.
+  std::unique_ptr<char[]> headers_frame_header;
+  const size_t headers_frame_header_length =
+      encoder_.SerializeHeadersFrameHeader(encoded_headers.size(),
+                                           &headers_frame_header);
+  unacked_frame_headers_offsets_.Add(
+      send_buffer().stream_offset(),
+      send_buffer().stream_offset() + headers_frame_header_length);
+
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing HEADERS frame header of length "
+                  << headers_frame_header_length;
+  WriteOrBufferData(
+      QuicStringPiece(headers_frame_header.get(), headers_frame_header_length),
+      /* fin = */ false, /* ack_listener = */ nullptr);
+
+  QUIC_DLOG(INFO) << "Stream " << id()
+                  << " is writing HEADERS frame payload of length "
+                  << encoded_headers.length();
+  WriteOrBufferData(encoded_headers, fin, nullptr);
+
+  return encoded_headers.size();
 }
 
 #undef ENDPOINT  // undef for jumbo builds