Call MarkConsumed() on QPACK bytes incrementally, as soon as possible.
QPACK data is already processed incrementally, but calling
QuicStreamSequencer::MarkConsumed() is postponed until headers or trailers are
consumed by the higher layer as signalled by a ConsumeHeaderList() or
MarkTrailersConsumed() call. This conflicts with test
ResponseProcessingTest.CookieRequestHeaderExcessivelyLong that had to be
disabled at b/249121660.
This CL makes sure MarkConsumed() is called for every fragment of HEADERS frames
immediately, every time for headers, and for trailers only if data bytes are all
consumed (otherwise the sequencer starts freeing up data bytes). And if not all data
bytes are consumed, then MarkConsumed() is called for trailer bytes as soon as
possible.
gfe-relnote: n/a; QUIC v99 only.
PiperOrigin-RevId: 252430032
Change-Id: I1aa005971a3de72ff536bb54a35817b8fd56a68e
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index b025fd8..b7d3011 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -162,7 +162,7 @@
trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
- trailers_consumed_in_sequencer_(false),
+ headers_bytes_to_be_marked_consumed_(0),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
body_buffer_(sequencer()),
ack_listener_(nullptr) {
@@ -193,7 +193,7 @@
trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
- trailers_consumed_in_sequencer_(false),
+ headers_bytes_to_be_marked_consumed_(0),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
body_buffer_(sequencer()),
ack_listener_(nullptr) {
@@ -375,9 +375,13 @@
return sequencer()->Readv(iov, iov_len);
}
size_t bytes_read = body_buffer_.ReadBody(iov, iov_len);
- if (trailers_consumed_) {
- MarkTrailersConsumed();
+
+ if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ // Maybe all DATA frame bytes have been read and some trailing HEADERS had
+ // already been processed, in which case MarkConsumed() should be called.
+ MaybeMarkHeadersBytesConsumed();
}
+
return bytes_read;
}
@@ -398,8 +402,11 @@
return;
}
body_buffer_.MarkBodyConsumed(num_bytes);
- if (trailers_consumed_) {
- MarkTrailersConsumed();
+
+ if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ // Maybe all DATA frame bytes have been read and some trailing HEADERS had
+ // already been processed, in which case MarkConsumed() should be called.
+ MaybeMarkHeadersBytesConsumed();
}
}
@@ -419,17 +426,6 @@
}
void QuicSpdyStream::MarkTrailersConsumed() {
- if (VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
- !reading_stopped() && !body_buffer_.HasBytesToRead() &&
- !trailers_consumed_in_sequencer_) {
- const QuicByteCount trailers_total_length =
- trailers_length_.header_length + trailers_length_.payload_length;
- if (trailers_total_length > 0) {
- sequencer()->MarkConsumed(trailers_total_length);
- trailers_consumed_in_sequencer_ = true;
- }
- }
-
trailers_consumed_ = true;
}
@@ -444,22 +440,12 @@
void QuicSpdyStream::ConsumeHeaderList() {
header_list_.Clear();
- if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
- if (FinishedReadingHeaders()) {
- sequencer()->SetUnblocked();
- }
+ if (!FinishedReadingHeaders()) {
return;
}
- if (!reading_stopped()) {
- const QuicByteCount headers_total_length =
- headers_length_.header_length + headers_length_.payload_length;
- if (headers_total_length > 0) {
- sequencer()->MarkConsumed(headers_total_length);
- }
- }
-
- if (!FinishedReadingHeaders()) {
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version())) {
+ sequencer()->SetUnblocked();
return;
}
@@ -483,6 +469,7 @@
bool QuicSpdyStream::OnStreamHeaderList(bool fin,
size_t frame_len,
const QuicHeaderList& header_list) {
+ // TODO(b/134706391): remove |fin| argument.
// The headers list avoid infinite buffering by clearing the headers list
// if the current headers are too large. So if the list is empty here
// then the headers list must have been too large, and the stream should
@@ -519,6 +506,7 @@
bool fin,
size_t /*frame_len*/,
const QuicHeaderList& header_list) {
+ // TODO(b/134706391): remove |fin| argument.
headers_decompressed_ = true;
header_list_ = header_list;
@@ -556,11 +544,10 @@
bool fin,
size_t /*frame_len*/,
const QuicHeaderList& header_list) {
+ // TODO(b/134706391): remove |fin| argument.
DCHECK(!trailers_decompressed_);
- if ((VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
- sequencer()->IsClosed()) ||
- (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
- fin_received())) {
+ if (!VersionUsesQpack(spdy_session_->connection()->transport_version()) &&
+ fin_received()) {
QUIC_DLOG(INFO) << "Received Trailers after FIN, on stream: " << id();
session()->connection()->CloseConnection(
QUIC_INVALID_HEADERS_STREAM_DATA, "Trailers after fin",
@@ -770,6 +757,16 @@
}
}
+void QuicSpdyStream::MaybeMarkHeadersBytesConsumed() {
+ DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
+
+ if (!body_buffer_.HasBytesToRead() && !reading_stopped() &&
+ headers_bytes_to_be_marked_consumed_ > 0) {
+ sequencer()->MarkConsumed(headers_bytes_to_be_marked_consumed_);
+ headers_bytes_to_be_marked_consumed_ = 0;
+ }
+}
+
QuicByteCount QuicSpdyStream::GetNumFrameHeadersInInterval(
QuicStreamOffset offset,
QuicByteCount data_length) const {
@@ -796,13 +793,23 @@
QuicMakeUnique<QpackDecodedHeadersAccumulator>(
id(), spdy_session_->qpack_decoder(),
spdy_session_->max_inbound_header_list_size());
+
+ // Do not call MaybeMarkHeadersBytesConsumed() yet, because
+ // HEADERS frame header bytes might not have been parsed completely.
+ headers_bytes_to_be_marked_consumed_ += frame_length.header_length;
+
return true;
}
bool QuicSpdyStream::OnHeadersFramePayload(QuicStringPiece payload) {
DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
- if (!qpack_decoded_headers_accumulator_->Decode(payload)) {
+ const bool success = qpack_decoded_headers_accumulator_->Decode(payload);
+
+ headers_bytes_to_be_marked_consumed_ += payload.size();
+ MaybeMarkHeadersBytesConsumed();
+
+ if (!success) {
// TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
std::string error_message =
QuicStrCat("Error decompressing header block on stream ", id(), ": ",
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 76c252d..caff21b 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -246,6 +246,10 @@
friend class QuicStreamUtils;
class HttpDecoderVisitor;
+ // Call QuicStreamSequencer::MarkConsumed() with
+ // |headers_bytes_to_be_marked_consumed_| if appropriate.
+ void MaybeMarkHeadersBytesConsumed();
+
// Given the interval marked by [|offset|, |offset| + |data_length|), return
// the number of frame header bytes contained in it.
QuicByteCount GetNumFrameHeadersInInterval(QuicStreamOffset offset,
@@ -270,8 +274,9 @@
bool trailers_decompressed_;
// True if the trailers have been consumed.
bool trailers_consumed_;
- // True if the trailers have actually been consumed in the stream sequencer.
- bool trailers_consumed_in_sequencer_;
+ // Number of bytes consumed while decoding HEADERS frames that cannot be
+ // marked consumed in QuicStreamSequencer until later.
+ QuicByteCount headers_bytes_to_be_marked_consumed_;
// The parsed trailers received from the peer.
spdy::SpdyHeaderBlock received_trailers_;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index 00a5dba..9ca1df2 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -35,7 +35,9 @@
using spdy::SpdyPriority;
using testing::_;
using testing::AtLeast;
+using testing::ElementsAre;
using testing::Invoke;
+using testing::Pair;
using testing::Return;
using testing::StrictMock;
@@ -90,6 +92,11 @@
const std::string& data() const { return data_; }
const spdy::SpdyHeaderBlock& saved_headers() const { return saved_headers_; }
+ // Expose protected accessor.
+ const QuicStreamSequencer* sequencer() const {
+ return QuicStream::sequencer();
+ }
+
private:
bool should_process_data_;
spdy::SpdyHeaderBlock saved_headers_;
@@ -149,6 +156,8 @@
"JBCScs_ejbKaqBDoB7ZGxTvqlrB__2ZmnHHjCr8RgMRtKNtIeuZAo ";
}
+ ~QuicSpdyStreamTest() override = default;
+
std::string EncodeQpackHeaders(QuicStreamId id, SpdyHeaderBlock* header) {
auto qpack_encoder =
QuicMakeUnique<QpackEncoder>(session_.get(), session_.get());
@@ -1098,6 +1107,13 @@
// If headers are received with a FIN, no trailers should then arrive.
Initialize(kShouldProcessData);
+ // If HEADERS frames are sent on the request/response stream, then the
+ // sequencer will signal an error if any stream data arrives after a FIN,
+ // so QuicSpdyStream does not need to.
+ if (VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
// Receive initial headers with FIN set.
ProcessHeaders(true, headers_);
stream_->ConsumeHeaderList();
@@ -1733,12 +1749,7 @@
QuicStreamFrame frame(stream_->id(), false, 0, stream_frame_payload);
stream_->OnStreamFrame(frame);
- auto it = stream_->header_list().begin();
- ASSERT_TRUE(it != stream_->header_list().end());
- EXPECT_EQ("foo", it->first);
- EXPECT_EQ("bar", it->second);
- ++it;
- EXPECT_TRUE(it == stream_->header_list().end());
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
// QuicSpdyStream only calls OnBodyAvailable()
// after the header list has been consumed.
@@ -1746,8 +1757,8 @@
stream_->ConsumeHeaderList();
EXPECT_EQ(kDataFramePayload, stream_->data());
- EXPECT_THAT(stream_->received_trailers(), testing::ElementsAre(testing::Pair(
- "custom-key", "custom-value")));
+ EXPECT_THAT(stream_->received_trailers(),
+ ElementsAre(Pair("custom-key", "custom-value")));
}
TEST_P(QuicSpdyStreamTest, ProcessBodyAfterTrailers) {
@@ -1829,6 +1840,116 @@
stream_->OnStreamFrame(frame);
}
+class QuicSpdyStreamIncrementalConsumptionTest : public QuicSpdyStreamTest {
+ protected:
+ QuicSpdyStreamIncrementalConsumptionTest() : offset_(0), consumed_bytes_(0) {}
+ ~QuicSpdyStreamIncrementalConsumptionTest() override = default;
+
+ // Create QuicStreamFrame with |payload|
+ // and pass it to stream_->OnStreamFrame().
+ void OnStreamFrame(QuicStringPiece payload) {
+ QuicStreamFrame frame(stream_->id(), /* fin = */ false, offset_, payload);
+ stream_->OnStreamFrame(frame);
+ offset_ += payload.size();
+ }
+
+ // Return number of bytes marked consumed with sequencer
+ // since last NewlyConsumedBytes() call.
+ QuicStreamOffset NewlyConsumedBytes() {
+ QuicStreamOffset previously_consumed_bytes = consumed_bytes_;
+ consumed_bytes_ = stream_->sequencer()->NumBytesConsumed();
+ return consumed_bytes_ - previously_consumed_bytes;
+ }
+
+ // Read |size| bytes from the stream.
+ std::string ReadFromStream(QuicByteCount size) {
+ std::string buffer;
+ buffer.resize(size);
+
+ struct iovec vec;
+ vec.iov_base = const_cast<char*>(buffer.data());
+ vec.iov_len = size;
+
+ size_t bytes_read = stream_->Readv(&vec, 1);
+ EXPECT_EQ(bytes_read, size);
+
+ return buffer;
+ }
+
+ private:
+ QuicStreamOffset offset_;
+ QuicStreamOffset consumed_bytes_;
+};
+
+INSTANTIATE_TEST_SUITE_P(Tests,
+ QuicSpdyStreamIncrementalConsumptionTest,
+ ::testing::Values(ParsedQuicVersion{PROTOCOL_TLS1_3,
+ QUIC_VERSION_99}));
+
+// Test that stream bytes are consumed (by calling
+// sequencer()->MarkConsumed()) incrementally, as soon as possible.
+TEST_P(QuicSpdyStreamIncrementalConsumptionTest, IncrementalConsumptionTest) {
+ if (!VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
+ Initialize(!kShouldProcessData);
+
+ // HEADERS frame with QPACK encoded single header field "foo: bar".
+ std::string headers =
+ HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+
+ // All HEADERS frame bytes are consumed even if the frame is not received
+ // completely (as long as at least some of the payload is received, which is
+ // an implementation detail that should not be tested).
+ OnStreamFrame(QuicStringPiece(headers).substr(0, headers.size() - 1));
+ EXPECT_EQ(headers.size() - 1, NewlyConsumedBytes());
+
+ // The rest of the HEADERS frame is also consumed immediately.
+ OnStreamFrame(QuicStringPiece(headers).substr(headers.size() - 1));
+ EXPECT_EQ(1u, NewlyConsumedBytes());
+
+ // Verify headers.
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+ stream_->ConsumeHeaderList();
+
+ // DATA frame.
+ QuicStringPiece data_payload(kDataFramePayload);
+ std::string data_frame = DataFrame(data_payload);
+
+ // DATA frame is not consumed because payload has to be buffered.
+ // TODO(bnc): Consume frame header as soon as possible.
+ OnStreamFrame(data_frame);
+ EXPECT_EQ(0u, NewlyConsumedBytes());
+
+ // Consume all but last byte of data.
+ EXPECT_EQ(data_payload.substr(0, data_payload.size() - 1),
+ ReadFromStream(data_payload.size() - 1));
+ EXPECT_EQ(data_frame.size() - 1, NewlyConsumedBytes());
+
+ // Trailing HEADERS frame with QPACK encoded
+ // single header field "custom-key: custom-value".
+ std::string trailers = HeadersFrame(
+ QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+
+ // No bytes are consumed, because last byte of DATA payload is still buffered.
+ OnStreamFrame(QuicStringPiece(trailers).substr(0, trailers.size() - 1));
+ EXPECT_EQ(0u, NewlyConsumedBytes());
+
+ // Reading last byte of DATA payload triggers consumption of all data received
+ // so far, even though last HEADERS frame has not been received completely.
+ EXPECT_EQ(data_payload.substr(data_payload.size() - 1), ReadFromStream(1));
+ EXPECT_EQ(1 + trailers.size() - 1, NewlyConsumedBytes());
+
+ // Last byte of trailers is immediately consumed.
+ OnStreamFrame(QuicStringPiece(trailers).substr(trailers.size() - 1));
+ EXPECT_EQ(1u, NewlyConsumedBytes());
+
+ // Verify trailers.
+ EXPECT_THAT(stream_->received_trailers(),
+ ElementsAre(Pair("custom-key", "custom-value")));
+}
+
} // namespace
} // namespace test
} // namespace quic