Support blocked decoding in QuicSpdyStream.
Modify QpackDecodedHeadersAccumulator interface and QuicSpdyStream
implementation to support blocked decoding of QPACK headers.
gfe-relnote: n/a, QUIC v99-only change.
PiperOrigin-RevId: 256201744
Change-Id: I17a4b6c0c6d4f30d8526c8b645c55da3ce01a076
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc
index e3d9357..9966f81 100644
--- a/quic/core/http/quic_spdy_stream.cc
+++ b/quic/core/http/quic_spdy_stream.cc
@@ -11,7 +11,6 @@
#include "net/third_party/quiche/src/quic/core/http/http_decoder.h"
#include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h"
#include "net/third_party/quiche/src/quic/core/http/spdy_utils.h"
-#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h"
#include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h"
#include "net/third_party/quiche/src/quic/core/quic_utils.h"
@@ -163,12 +162,14 @@
spdy_session_(spdy_session),
on_body_available_called_because_sequencer_is_closed_(false),
visitor_(nullptr),
+ blocked_on_decoding_headers_(false),
headers_decompressed_(false),
headers_length_(0, 0),
trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
headers_bytes_to_be_marked_consumed_(0),
+ pretend_blocked_decoding_for_tests_(false),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
body_buffer_(sequencer()),
sequencer_offset_(0),
@@ -197,12 +198,14 @@
spdy_session_(spdy_session),
on_body_available_called_because_sequencer_is_closed_(false),
visitor_(nullptr),
+ blocked_on_decoding_headers_(false),
headers_decompressed_(false),
headers_length_(0, 0),
trailers_length_(0, 0),
trailers_decompressed_(false),
trailers_consumed_(false),
headers_bytes_to_be_marked_consumed_(0),
+ pretend_blocked_decoding_for_tests_(false),
http_decoder_visitor_(new HttpDecoderVisitor(this)),
body_buffer_(sequencer()),
sequencer_offset_(sequencer()->NumBytesConsumed()),
@@ -497,6 +500,24 @@
}
}
+void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers) {
+ blocked_on_decoding_headers_ = false;
+ ProcessDecodedHeaders(headers);
+ // Continue decoding HTTP/3 frames.
+ OnDataAvailable();
+}
+
+void QuicSpdyStream::OnHeaderDecodingError() {
+ // TODO(b/124216424): Use HTTP_EXCESSIVE_LOAD or
+ // HTTP_QPACK_DECOMPRESSION_FAILED error code as indicated by
+ // |qpack_decoded_headers_accumulator_|.
+ std::string error_message = QuicStrCat(
+ "Error during async decoding of ",
+ headers_decompressed_ ? "trailers" : "headers", " on stream ", id(), ": ",
+ qpack_decoded_headers_accumulator_->error_message());
+ CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message);
+}
+
void QuicSpdyStream::OnHeadersTooLarge() {
if (VersionUsesQpack(spdy_session_->connection()->transport_version())) {
// TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code.
@@ -632,6 +653,10 @@
return;
}
+ if (blocked_on_decoding_headers_) {
+ return;
+ }
+
iovec iov;
while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) {
DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed());
@@ -645,6 +670,9 @@
reinterpret_cast<const char*>(iov.iov_base), iov.iov_len);
is_decoder_processing_input_ = false;
sequencer_offset_ += processed_bytes;
+ if (blocked_on_decoding_headers_) {
+ return;
+ }
}
// Do not call OnBodyAvailable() until headers are consumed.
@@ -819,8 +847,9 @@
qpack_decoded_headers_accumulator_ =
QuicMakeUnique<QpackDecodedHeadersAccumulator>(
- id(), spdy_session_->qpack_decoder(),
- spdy_session_->max_inbound_header_list_size());
+ id(), spdy_session_->qpack_decoder(), this,
+ spdy_session_->max_inbound_header_list_size(),
+ pretend_blocked_decoding_for_tests_);
// Do not call MaybeMarkHeadersBytesConsumed() yet, because
// HEADERS frame header bytes might not have been parsed completely.
@@ -851,7 +880,9 @@
bool QuicSpdyStream::OnHeadersFrameEnd() {
DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version()));
- if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) {
+ auto result = qpack_decoded_headers_accumulator_->EndHeaderBlock();
+
+ if (result == QpackDecodedHeadersAccumulator::Status::kError) {
// TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code.
std::string error_message =
QuicStrCat("Error decompressing header block on stream ", id(), ": ",
@@ -860,13 +891,23 @@
return false;
}
+ if (result == QpackDecodedHeadersAccumulator::Status::kBlocked) {
+ blocked_on_decoding_headers_ = true;
+ return false;
+ }
+
+ DCHECK(result == QpackDecodedHeadersAccumulator::Status::kSuccess);
+
+ ProcessDecodedHeaders(qpack_decoded_headers_accumulator_->quic_header_list());
+ return !sequencer()->IsClosed() && !reading_stopped();
+}
+
+void QuicSpdyStream::ProcessDecodedHeaders(const QuicHeaderList& headers) {
const QuicByteCount frame_length = headers_decompressed_
? trailers_length_.payload_length
: headers_length_.payload_length;
- OnStreamHeaderList(/* fin = */ false, frame_length,
- qpack_decoded_headers_accumulator_->quic_header_list());
+ OnStreamHeaderList(/* fin = */ false, frame_length, headers);
qpack_decoded_headers_accumulator_.reset();
- return !sequencer()->IsClosed() && !reading_stopped();
}
size_t QuicSpdyStream::WriteHeadersImpl(
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h
index 1a5c1b6..9a59ddc 100644
--- a/quic/core/http/quic_spdy_stream.h
+++ b/quic/core/http/quic_spdy_stream.h
@@ -19,6 +19,7 @@
#include "net/third_party/quiche/src/quic/core/http/http_encoder.h"
#include "net/third_party/quiche/src/quic/core/http/quic_header_list.h"
#include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream_body_buffer.h"
+#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h"
#include "net/third_party/quiche/src/quic/core/quic_packets.h"
#include "net/third_party/quiche/src/quic/core/quic_stream.h"
#include "net/third_party/quiche/src/quic/core/quic_stream_sequencer.h"
@@ -34,11 +35,12 @@
class QuicStreamPeer;
} // namespace test
-class QpackDecodedHeadersAccumulator;
class QuicSpdySession;
// A QUIC stream that can send and receive HTTP2 (SPDY) headers.
-class QUIC_EXPORT_PRIVATE QuicSpdyStream : public QuicStream {
+class QUIC_EXPORT_PRIVATE QuicSpdyStream
+ : public QuicStream,
+ public QpackDecodedHeadersAccumulator::Visitor {
public:
// Visitor receives callbacks from the stream.
class QUIC_EXPORT_PRIVATE Visitor {
@@ -206,6 +208,10 @@
using QuicStream::CloseWriteSide;
+ // QpackDecodedHeadersAccumulator::Visitor implementation.
+ void OnHeadersDecoded(QuicHeaderList headers) override;
+ void OnHeaderDecodingError() override;
+
protected:
// Called when the received headers are too large. By default this will
// reset the stream.
@@ -246,6 +252,9 @@
bool OnHeadersFramePayload(QuicStringPiece payload);
bool OnHeadersFrameEnd();
+ // Called internally when headers are decoded.
+ void ProcessDecodedHeaders(const QuicHeaderList& headers);
+
// Call QuicStreamSequencer::MarkConsumed() with
// |headers_bytes_to_be_marked_consumed_| if appropriate.
void MaybeMarkHeadersBytesConsumed();
@@ -260,6 +269,10 @@
bool on_body_available_called_because_sequencer_is_closed_;
Visitor* visitor_;
+
+ // True if read side processing is blocked while waiting for callback from
+ // QPACK decoder.
+ bool blocked_on_decoding_headers_;
// True if the headers have been completely decompressed.
bool headers_decompressed_;
// Contains a copy of the decompressed header (name, value) pairs until they
@@ -284,6 +297,9 @@
HttpEncoder encoder_;
// Http decoder for processing raw incoming stream frames.
HttpDecoder decoder_;
+ // TODO(b/112770235): Remove once blocked decoding is implemented
+ // and can be tested with delayed encoder stream data.
+ bool pretend_blocked_decoding_for_tests_;
// Headers accumulator for decoding HEADERS frame payload.
std::unique_ptr<QpackDecodedHeadersAccumulator>
qpack_decoded_headers_accumulator_;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index bab9228..b50c3bb 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1810,6 +1810,146 @@
stream_->OnStreamFrame(frame);
}
+TEST_P(QuicSpdyStreamTest, BlockedHeaderDecoding) {
+ if (!VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
+ Initialize(kShouldProcessData);
+ QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+ // HEADERS frame with QPACK encoded single header field "foo: bar".
+ std::string headers =
+ HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+ // Even though entire header block is received, it cannot be decoded.
+ EXPECT_FALSE(stream_->headers_decompressed());
+
+ // Signal to QpackDecodedHeadersAccumulator that the header block has been
+ // decoded. (OnDecodingCompleted() has already been called by the QPACK
+ // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called
+ // twice, which is a no-op.)
+ QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+ ->OnDecodingCompleted();
+
+ EXPECT_TRUE(stream_->headers_decompressed());
+
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+ stream_->ConsumeHeaderList();
+
+ std::string data = DataFrame(kDataFramePayload);
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */
+ headers.length(), data));
+ EXPECT_EQ(kDataFramePayload, stream_->data());
+
+ // Trailing HEADERS frame with QPACK encoded
+ // single header field "custom-key: custom-value".
+ std::string trailers = HeadersFrame(
+ QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */
+ headers.length() + data.length(),
+ trailers));
+
+ // Even though entire header block is received, it cannot be decoded.
+ EXPECT_FALSE(stream_->trailers_decompressed());
+
+ // Signal to QpackDecodedHeadersAccumulator that the header block has been
+ // decoded.
+ QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+ ->OnDecodingCompleted();
+ EXPECT_TRUE(stream_->trailers_decompressed());
+
+ // Verify trailers.
+ EXPECT_THAT(stream_->received_trailers(),
+ ElementsAre(Pair("custom-key", "custom-value")));
+ stream_->MarkTrailersConsumed();
+}
+
+TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingHeaders) {
+ if (!VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
+ Initialize(kShouldProcessData);
+ QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+ // HEADERS frame with QPACK encoded single header field "foo: bar".
+ std::string headers =
+ HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+ // Even though entire header block is received, it cannot be decoded.
+ EXPECT_FALSE(stream_->headers_decompressed());
+
+ // Signal a decoding error to QpackDecodedHeadersAccumulator.
+ std::string expected_error_message =
+ QuicStrCat("Error during async decoding of headers on stream ",
+ stream_->id(), ": unexpected error");
+ EXPECT_CALL(
+ *connection_,
+ CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message,
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET));
+ QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+ ->OnDecodingErrorDetected("unexpected error");
+}
+
+TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingTrailers) {
+ if (!VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
+ Initialize(kShouldProcessData);
+ QuicSpdyStreamPeer::pretend_blocked_decoding(stream_);
+
+ // HEADERS frame with QPACK encoded single header field "foo: bar".
+ std::string headers =
+ HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers));
+
+ // Even though entire header block is received, it cannot be decoded.
+ EXPECT_FALSE(stream_->headers_decompressed());
+
+ // Signal to QpackDecodedHeadersAccumulator that the header block has been
+ // decoded. (OnDecodingCompleted() has already been called by the QPACK
+ // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called
+ // twice, which is a no-op.)
+ QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+ ->OnDecodingCompleted();
+
+ EXPECT_TRUE(stream_->headers_decompressed());
+
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+ stream_->ConsumeHeaderList();
+
+ std::string data = DataFrame(kDataFramePayload);
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */
+ headers.length(), data));
+ EXPECT_EQ(kDataFramePayload, stream_->data());
+
+ // Trailing HEADERS frame with QPACK encoded
+ // single header field "custom-key: custom-value".
+ std::string trailers = HeadersFrame(
+ QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+ stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */
+ headers.length() + data.length(),
+ trailers));
+
+ // Even though entire header block is received, it cannot be decoded.
+ EXPECT_FALSE(stream_->trailers_decompressed());
+
+ // Signal a decoding error to QpackDecodedHeadersAccumulator.
+ std::string expected_error_message =
+ QuicStrCat("Error during async decoding of trailers on stream ",
+ stream_->id(), ": unexpected error");
+ EXPECT_CALL(
+ *connection_,
+ CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message,
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET));
+ QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_)
+ ->OnDecodingErrorDetected("unexpected error");
+}
+
class QuicSpdyStreamIncrementalConsumptionTest : public QuicSpdyStreamTest {
protected:
QuicSpdyStreamIncrementalConsumptionTest() : offset_(0), consumed_bytes_(0) {}