Cloned from CL 231494986 by 'g4 patch'.
Original change by bnc@bnc:majom-bnc-chromium-google3-headers-git5:2181:citc on 2019/01/29 16:21:02.
Send HEADERS on the request stream.
In QUIC version 99, send HEADERS (including trailers) on the request stream
instead of the headers stream, compressed with QPACK instead of HPACK.
gfe-relnote: Send HEADERS on the request stream in QUIC version 99 only. Not flag protected.
PiperOrigin-RevId: 249121660
Change-Id: I933f9433da8bfffc8c2979aea742d485639b33c5
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index d04efc5..0127c5b 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -4,12 +4,17 @@
#include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream.h"
+#include <limits>
#include <string>
#include <utility>
#include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h"
#include "net/third_party/quiche/src/quic/core/http/spdy_utils.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
#include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/core/quic_write_blocked_list.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
@@ -139,8 +144,11 @@
StreamType type)
: QuicStream(id, spdy_session, /*is_static=*/false, type),
spdy_session_(spdy_session),
+ on_body_available_called_because_sequencer_is_closed_(false),
visitor_(nullptr),
headers_decompressed_(false),
+ headers_length_(0, 0),
+ trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
@@ -148,9 +156,11 @@
ack_listener_(nullptr) {
DCHECK(!QuicUtils::IsCryptoStreamId(
spdy_session->connection()->transport_version(), id));
- // Don't receive any callbacks from the sequencer until headers
- // are complete.
- sequencer()->SetBlockedUntilFlush();
+ // If headers are sent on the headers stream, then do not receive any
+ // callbacks from the sequencer until headers are complete.
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ sequencer()->SetBlockedUntilFlush();
+ }
if (VersionHasDataFrameHeader(
spdy_session_->connection()->transport_version())) {
@@ -164,8 +174,11 @@
StreamType type)
: QuicStream(std::move(pending), type, /*is_static=*/false),
spdy_session_(spdy_session),
+ on_body_available_called_because_sequencer_is_closed_(false),
visitor_(nullptr),
headers_decompressed_(false),
+ headers_length_(0, 0),
+ trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
@@ -173,9 +186,11 @@
ack_listener_(nullptr) {
DCHECK(!QuicUtils::IsCryptoStreamId(
spdy_session->connection()->transport_version(), id()));
- // Don't receive any callbacks from the sequencer until headers
- // are complete.
- sequencer()->SetBlockedUntilFlush();
+ // If headers are sent on the headers stream, then do not receive any
+ // callbacks from the sequencer until headers are complete.
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ sequencer()->SetBlockedUntilFlush();
+ }
if (VersionHasDataFrameHeader(
spdy_session_->connection()->transport_version())) {
@@ -192,7 +207,11 @@
QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
size_t bytes_written =
WriteHeadersImpl(std::move(header_block), fin, std::move(ack_listener));
- if (fin) {
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ fin) {
+ // If HEADERS are sent on the headers stream, then |fin_sent_| needs to be
+ // set and write side needs to be closed without actually sending a FIN on
+ // this stream.
// TODO(rch): Add test to ensure fin_sent_ is set whenever a fin is sent.
set_fin_sent(true);
CloseWriteSide();
@@ -217,13 +236,15 @@
unacked_frame_headers_offsets_.Add(
send_buffer().stream_offset(),
send_buffer().stream_offset() + header_length);
- QUIC_DLOG(INFO) << "Stream " << id() << " is writing header of length "
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing DATA frame header of length "
<< header_length;
WriteOrBufferData(QuicStringPiece(buffer.get(), header_length), false,
nullptr);
// Write body.
- QUIC_DLOG(INFO) << "Stream " << id() << " is writing body of length "
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing DATA frame payload of length "
<< data.length();
WriteOrBufferData(data, fin, nullptr);
}
@@ -236,26 +257,33 @@
return 0;
}
- // The header block must contain the final offset for this stream, as the
- // trailers may be processed out of order at the peer.
- QUIC_DLOG(INFO) << "Inserting trailer: (" << kFinalOffsetHeaderKey << ", "
- << stream_bytes_written() + BufferedDataBytes() << ")";
- trailer_block.insert(
- std::make_pair(kFinalOffsetHeaderKey,
- QuicTextUtils::Uint64ToString(stream_bytes_written() +
- BufferedDataBytes())));
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ // The header block must contain the final offset for this stream, as the
+ // trailers may be processed out of order at the peer.
+ const QuicStreamOffset final_offset =
+ stream_bytes_written() + BufferedDataBytes();
+ QUIC_DLOG(INFO) << "Inserting trailer: (" << kFinalOffsetHeaderKey << ", "
+ << final_offset << ")";
+ trailer_block.insert(std::make_pair(
+ kFinalOffsetHeaderKey, QuicTextUtils::Uint64ToString(final_offset)));
+ }
// Write the trailing headers with a FIN, and close stream for writing:
// trailers are the last thing to be sent on a stream.
const bool kFin = true;
size_t bytes_written =
WriteHeadersImpl(std::move(trailer_block), kFin, std::move(ack_listener));
- set_fin_sent(kFin);
- // Trailers are the last thing to be sent on a stream, but if there is still
- // queued data then CloseWriteSide() will cause it never to be sent.
- if (BufferedDataBytes() == 0) {
- CloseWriteSide();
+ // If trailers are sent on the headers stream, then |fin_sent_| needs to be
+ // set without actually sending a FIN on this stream.
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ set_fin_sent(kFin);
+
+ // Also, write side of this stream needs to be closed. However, only do
+ // this if there is no more buffered data, otherwise it will never be sent.
+ if (BufferedDataBytes() == 0) {
+ CloseWriteSide();
+ }
}
return bytes_written;
@@ -298,12 +326,14 @@
unacked_frame_headers_offsets_.Add(
send_buffer().stream_offset(),
send_buffer().stream_offset() + header_length);
- QUIC_DLOG(INFO) << "Stream " << id() << " is writing header of length "
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing DATA frame header of length "
<< header_length;
WriteMemSlices(storage.ToSpan(), false);
// Write body.
- QUIC_DLOG(INFO) << "Stream " << id() << " is writing body of length "
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing DATA frame payload of length "
<< slices.total_length();
return WriteMemSlices(slices, fin);
}
@@ -352,6 +382,15 @@
}
void QuicSpdyStream::MarkTrailersConsumed() {
+ if (VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ !reading_stopped()) {
+ const QuicByteCount trailers_total_length =
+ trailers_length_.header_length + trailers_length_.payload_length;
+ if (trailers_total_length > 0) {
+ sequencer()->MarkConsumed(trailers_total_length);
+ }
+ }
+
trailers_consumed_ = true;
}
@@ -365,8 +404,35 @@
void QuicSpdyStream::ConsumeHeaderList() {
header_list_.Clear();
- if (FinishedReadingHeaders()) {
- sequencer()->SetUnblocked();
+
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ if (FinishedReadingHeaders()) {
+ sequencer()->SetUnblocked();
+ }
+ return;
+ }
+
+ if (!reading_stopped()) {
+ const QuicByteCount headers_total_length =
+ headers_length_.header_length + headers_length_.payload_length;
+ if (headers_total_length > 0) {
+ sequencer()->MarkConsumed(headers_total_length);
+ }
+ }
+
+ if (!FinishedReadingHeaders()) {
+ return;
+ }
+
+ if (body_buffer_.HasBytesToRead()) {
+ OnBodyAvailable();
+ return;
+ }
+
+ if (sequencer()->IsClosed() &&
+ !on_body_available_called_because_sequencer_is_closed_) {
+ on_body_available_called_because_sequencer_is_closed_ = true;
+ OnBodyAvailable();
}
}
@@ -398,7 +464,15 @@
}
void QuicSpdyStream::OnHeadersTooLarge() {
- Reset(QUIC_HEADERS_TOO_LARGE);
+ if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ // TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code.
+ std::string error_message =
+ QuicStrCat("Too large headers received on stream ", id());
+ CloseConnectionWithDetails(QUIC_HEADERS_STREAM_DATA_DECOMPRESS_FAILURE,
+ error_message);
+ } else {
+ Reset(QUIC_HEADERS_TOO_LARGE);
+ }
}
void QuicSpdyStream::OnInitialHeadersComplete(
@@ -407,8 +481,20 @@
const QuicHeaderList& header_list) {
headers_decompressed_ = true;
header_list_ = header_list;
+
+ if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ if (fin) {
+ OnStreamFrame(
+ QuicStreamFrame(id(), /* fin = */ true,
+ flow_controller()->highest_received_byte_offset(),
+ QuicStringPiece()));
+ }
+ return;
+ }
+
if (fin) {
- OnStreamFrame(QuicStreamFrame(id(), fin, 0, QuicStringPiece()));
+ OnStreamFrame(
+ QuicStreamFrame(id(), fin, /* offset = */ 0, QuicStringPiece()));
}
if (FinishedReadingHeaders()) {
sequencer()->SetUnblocked();
@@ -431,15 +517,20 @@
size_t /*frame_len*/,
const QuicHeaderList& header_list) {
DCHECK(!trailers_decompressed_);
- if (fin_received()) {
- QUIC_DLOG(ERROR) << "Received Trailers after FIN, on stream: " << id();
+ if ((VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ sequencer()->IsClosed()) ||
+ (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ fin_received())) {
+ QUIC_DLOG(INFO) << "Received Trailers after FIN, on stream: " << id();
session()->connection()->CloseConnection(
QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
return;
}
- if (!fin) {
- QUIC_DLOG(ERROR) << "Trailers must have FIN set, on stream: " << id();
+
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ !fin) {
+ QUIC_DLOG(INFO) << "Trailers must have FIN set, on stream: " << id();
session()->connection()->CloseConnection(
QUIC_INVALID_HEADERS_STREAM_DATA, "Fin missing from trailers",
ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
@@ -447,8 +538,9 @@
}
size_t final_byte_offset = 0;
- if (!SpdyUtils::CopyAndValidateTrailers(header_list,
- /* expect_final_byte_offset = */ true,
+ const bool expect_final_byte_offset =
+ !VersionUsesQpack(spdy_session_->connection()->transport_version());
+ if (!SpdyUtils::CopyAndValidateTrailers(header_list, expect_final_byte_offset,
&final_byte_offset,
&received_trailers_)) {
QUIC_DLOG(ERROR) << "Trailers for stream " << id() << " are malformed.";
@@ -458,16 +550,12 @@
return;
}
trailers_decompressed_ = true;
+ const QuicStreamOffset offset =
+ VersionUsesQpack(spdy_session_->connection()->transport_version())
+ ? flow_controller()->highest_received_byte_offset()
+ : final_byte_offset;
OnStreamFrame(
- QuicStreamFrame(id(), fin, final_byte_offset, QuicStringPiece()));
-}
-
-size_t QuicSpdyStream::WriteHeadersImpl(
- spdy::SpdyHeaderBlock header_block,
- bool fin,
- QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
- return spdy_session_->WriteHeadersOnHeadersStream(
- id(), std::move(header_block), fin, priority(), std::move(ack_listener));
+ QuicStreamFrame(id(), /* fin = */ true, offset, QuicStringPiece()));
}
void QuicSpdyStream::OnPriorityFrame(SpdyPriority priority) {
@@ -488,7 +576,10 @@
}
void QuicSpdyStream::OnDataAvailable() {
- DCHECK(FinishedReadingHeaders());
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ // Sequencer must be blocked until headers are consumed.
+ DCHECK(FinishedReadingHeaders());
+ }
if (!VersionHasDataFrameHeader(
session()->connection()->transport_version())) {
@@ -502,14 +593,20 @@
iov.iov_len);
}
+ // Do not call OnBodyAvailable() until headers are consumed.
+ if (!FinishedReadingHeaders()) {
+ return;
+ }
+
if (body_buffer_.HasBytesToRead()) {
OnBodyAvailable();
return;
}
- if (sequencer()->IsClosed()) {
+ if (sequencer()->IsClosed() &&
+ !on_body_available_called_because_sequencer_is_closed_) {
+ on_body_available_called_because_sequencer_is_closed_ = true;
OnBodyAvailable();
- return;
}
}
@@ -645,14 +742,96 @@
void QuicSpdyStream::OnHeadersFrameStart(Http3FrameLengths frame_length) {
DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+ DCHECK(!qpack_decoded_headers_accumulator_);
+
+ if (headers_decompressed_) {
+ trailers_length_ = frame_length;
+ } else {
+ headers_length_ = frame_length;
+ }
+
+ qpack_decoded_headers_accumulator_ =
+ QuicMakeUnique<QpackDecodedHeadersAccumulator>(
+ id(), spdy_session_->qpack_decoder(),
+ spdy_session_->max_inbound_header_list_size());
}
void QuicSpdyStream::OnHeadersFramePayload(QuicStringPiece payload) {
DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+ if (!qpack_decoded_headers_accumulator_->Decode(payload)) {
+ // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
+ std::string error_message =
+ QuicStrCat("Error decompressing header block on stream ", id(), ": ",
+ qpack_decoded_headers_accumulator_->error_message());
+ CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+ return;
+ }
}
void QuicSpdyStream::OnHeadersFrameEnd() {
DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+ if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) {
+ // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
+ std::string error_message =
+ QuicStrCat("Error decompressing header block on stream ", id(), ": ",
+ qpack_decoded_headers_accumulator_->error_message());
+ CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+ return;
+ }
+
+ const QuicByteCount frame_length = headers_decompressed_
+ ? trailers_length_.payload_length
+ : headers_length_.payload_length;
+ OnStreamHeaderList(/* fin = */ false, frame_length,
+ qpack_decoded_headers_accumulator_->quic_header_list());
+
+ qpack_decoded_headers_accumulator_.reset();
+}
+
+size_t QuicSpdyStream::WriteHeadersImpl(
+ spdy::SpdyHeaderBlock header_block,
+ bool fin,
+ QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ return spdy_session_->WriteHeadersOnHeadersStream(
+ id(), std::move(header_block), fin, priority(),
+ std::move(ack_listener));
+ }
+
+ // Encode header list.
+ auto progressive_encoder = spdy_session_->qpack_encoder()->EncodeHeaderList(
+ /* stream_id = */ id(), &header_block);
+ std::string encoded_headers;
+ while (progressive_encoder->HasNext()) {
+ progressive_encoder->Next(
+ /* max_encoded_bytes = */ std::numeric_limits<size_t>::max(),
+ &encoded_headers);
+ }
+
+ // Write HEADERS frame.
+ std::unique_ptr<char[]> headers_frame_header;
+ const size_t headers_frame_header_length =
+ encoder_.SerializeHeadersFrameHeader(encoded_headers.size(),
+ &headers_frame_header);
+ unacked_frame_headers_offsets_.Add(
+ send_buffer().stream_offset(),
+ send_buffer().stream_offset() + headers_frame_header_length);
+
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing HEADERS frame header of length "
+ << headers_frame_header_length;
+ WriteOrBufferData(
+ QuicStringPiece(headers_frame_header.get(), headers_frame_header_length),
+ /* fin = */ false, /* ack_listener = */ nullptr);
+
+ QUIC_DLOG(INFO) << "Stream " << id()
+ << " is writing HEADERS frame payload of length "
+ << encoded_headers.length();
+ WriteOrBufferData(encoded_headers, fin, nullptr);
+
+ return encoded_headers.size();
}
#undef ENDPOINT // undef for jumbo builds