Support blocked decoding in QuicSpdyStream. Modify QpackDecodedHeadersAccumulator interface and QuicSpdyStream implementation to support blocked decoding of QPACK headers. gfe-relnote: n/a, QUIC v99-only change. PiperOrigin-RevId: 256201744 Change-Id: I17a4b6c0c6d4f30d8526c8b645c55da3ce01a076
diff --git a/quic/core/http/quic_spdy_stream.cc b/quic/core/http/quic_spdy_stream.cc index e3d9357..9966f81 100644 --- a/quic/core/http/quic_spdy_stream.cc +++ b/quic/core/http/quic_spdy_stream.cc
@@ -11,7 +11,6 @@ #include "net/third_party/quiche/src/quic/core/http/http_decoder.h" #include "net/third_party/quiche/src/quic/core/http/quic_spdy_session.h" #include "net/third_party/quiche/src/quic/core/http/spdy_utils.h" -#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h" #include "net/third_party/quiche/src/quic/core/qpack/qpack_decoder.h" #include "net/third_party/quiche/src/quic/core/qpack/qpack_encoder.h" #include "net/third_party/quiche/src/quic/core/quic_utils.h" @@ -163,12 +162,14 @@ spdy_session_(spdy_session), on_body_available_called_because_sequencer_is_closed_(false), visitor_(nullptr), + blocked_on_decoding_headers_(false), headers_decompressed_(false), headers_length_(0, 0), trailers_length_(0, 0), trailers_decompressed_(false), trailers_consumed_(false), headers_bytes_to_be_marked_consumed_(0), + pretend_blocked_decoding_for_tests_(false), http_decoder_visitor_(new HttpDecoderVisitor(this)), body_buffer_(sequencer()), sequencer_offset_(0), @@ -197,12 +198,14 @@ spdy_session_(spdy_session), on_body_available_called_because_sequencer_is_closed_(false), visitor_(nullptr), + blocked_on_decoding_headers_(false), headers_decompressed_(false), headers_length_(0, 0), trailers_length_(0, 0), trailers_decompressed_(false), trailers_consumed_(false), headers_bytes_to_be_marked_consumed_(0), + pretend_blocked_decoding_for_tests_(false), http_decoder_visitor_(new HttpDecoderVisitor(this)), body_buffer_(sequencer()), sequencer_offset_(sequencer()->NumBytesConsumed()), @@ -497,6 +500,24 @@ } } +void QuicSpdyStream::OnHeadersDecoded(QuicHeaderList headers) { + blocked_on_decoding_headers_ = false; + ProcessDecodedHeaders(headers); + // Continue decoding HTTP/3 frames. + OnDataAvailable(); +} + +void QuicSpdyStream::OnHeaderDecodingError() { + // TODO(b/124216424): Use HTTP_EXCESSIVE_LOAD or + // HTTP_QPACK_DECOMPRESSION_FAILED error code as indicated by + // |qpack_decoded_headers_accumulator_|. + std::string error_message = QuicStrCat( + "Error during async decoding of ", + headers_decompressed_ ? "trailers" : "headers", " on stream ", id(), ": ", + qpack_decoded_headers_accumulator_->error_message()); + CloseConnectionWithDetails(QUIC_DECOMPRESSION_FAILURE, error_message); +} + void QuicSpdyStream::OnHeadersTooLarge() { if (VersionUsesQpack(spdy_session_->connection()->transport_version())) { // TODO(124216424): Use HTTP_EXCESSIVE_LOAD error code. @@ -632,6 +653,10 @@ return; } + if (blocked_on_decoding_headers_) { + return; + } + iovec iov; while (!reading_stopped() && decoder_.error() == QUIC_NO_ERROR) { DCHECK_GE(sequencer_offset_, sequencer()->NumBytesConsumed()); @@ -645,6 +670,9 @@ reinterpret_cast<const char*>(iov.iov_base), iov.iov_len); is_decoder_processing_input_ = false; sequencer_offset_ += processed_bytes; + if (blocked_on_decoding_headers_) { + return; + } } // Do not call OnBodyAvailable() until headers are consumed. @@ -819,8 +847,9 @@ qpack_decoded_headers_accumulator_ = QuicMakeUnique<QpackDecodedHeadersAccumulator>( - id(), spdy_session_->qpack_decoder(), - spdy_session_->max_inbound_header_list_size()); + id(), spdy_session_->qpack_decoder(), this, + spdy_session_->max_inbound_header_list_size(), + pretend_blocked_decoding_for_tests_); // Do not call MaybeMarkHeadersBytesConsumed() yet, because // HEADERS frame header bytes might not have been parsed completely. @@ -851,7 +880,9 @@ bool QuicSpdyStream::OnHeadersFrameEnd() { DCHECK(VersionUsesQpack(spdy_session_->connection()->transport_version())); - if (!qpack_decoded_headers_accumulator_->EndHeaderBlock()) { + auto result = qpack_decoded_headers_accumulator_->EndHeaderBlock(); + + if (result == QpackDecodedHeadersAccumulator::Status::kError) { // TODO(124216424): Use HTTP_QPACK_DECOMPRESSION_FAILED error code. std::string error_message = QuicStrCat("Error decompressing header block on stream ", id(), ": ", @@ -860,13 +891,23 @@ return false; } + if (result == QpackDecodedHeadersAccumulator::Status::kBlocked) { + blocked_on_decoding_headers_ = true; + return false; + } + + DCHECK(result == QpackDecodedHeadersAccumulator::Status::kSuccess); + + ProcessDecodedHeaders(qpack_decoded_headers_accumulator_->quic_header_list()); + return !sequencer()->IsClosed() && !reading_stopped(); +} + +void QuicSpdyStream::ProcessDecodedHeaders(const QuicHeaderList& headers) { const QuicByteCount frame_length = headers_decompressed_ ? trailers_length_.payload_length : headers_length_.payload_length; - OnStreamHeaderList(/* fin = */ false, frame_length, - qpack_decoded_headers_accumulator_->quic_header_list()); + OnStreamHeaderList(/* fin = */ false, frame_length, headers); qpack_decoded_headers_accumulator_.reset(); - return !sequencer()->IsClosed() && !reading_stopped(); } size_t QuicSpdyStream::WriteHeadersImpl(
diff --git a/quic/core/http/quic_spdy_stream.h b/quic/core/http/quic_spdy_stream.h index 1a5c1b6..9a59ddc 100644 --- a/quic/core/http/quic_spdy_stream.h +++ b/quic/core/http/quic_spdy_stream.h
@@ -19,6 +19,7 @@ #include "net/third_party/quiche/src/quic/core/http/http_encoder.h" #include "net/third_party/quiche/src/quic/core/http/quic_header_list.h" #include "net/third_party/quiche/src/quic/core/http/quic_spdy_stream_body_buffer.h" +#include "net/third_party/quiche/src/quic/core/qpack/qpack_decoded_headers_accumulator.h" #include "net/third_party/quiche/src/quic/core/quic_packets.h" #include "net/third_party/quiche/src/quic/core/quic_stream.h" #include "net/third_party/quiche/src/quic/core/quic_stream_sequencer.h" @@ -34,11 +35,12 @@ class QuicStreamPeer; } // namespace test -class QpackDecodedHeadersAccumulator; class QuicSpdySession; // A QUIC stream that can send and receive HTTP2 (SPDY) headers. -class QUIC_EXPORT_PRIVATE QuicSpdyStream : public QuicStream { +class QUIC_EXPORT_PRIVATE QuicSpdyStream + : public QuicStream, + public QpackDecodedHeadersAccumulator::Visitor { public: // Visitor receives callbacks from the stream. class QUIC_EXPORT_PRIVATE Visitor { @@ -206,6 +208,10 @@ using QuicStream::CloseWriteSide; + // QpackDecodedHeadersAccumulator::Visitor implementation. + void OnHeadersDecoded(QuicHeaderList headers) override; + void OnHeaderDecodingError() override; + protected: // Called when the received headers are too large. By default this will // reset the stream. @@ -246,6 +252,9 @@ bool OnHeadersFramePayload(QuicStringPiece payload); bool OnHeadersFrameEnd(); + // Called internally when headers are decoded. + void ProcessDecodedHeaders(const QuicHeaderList& headers); + // Call QuicStreamSequencer::MarkConsumed() with // |headers_bytes_to_be_marked_consumed_| if appropriate. void MaybeMarkHeadersBytesConsumed(); @@ -260,6 +269,10 @@ bool on_body_available_called_because_sequencer_is_closed_; Visitor* visitor_; + + // True if read side processing is blocked while waiting for callback from + // QPACK decoder. + bool blocked_on_decoding_headers_; // True if the headers have been completely decompressed. bool headers_decompressed_; // Contains a copy of the decompressed header (name, value) pairs until they @@ -284,6 +297,9 @@ HttpEncoder encoder_; // Http decoder for processing raw incoming stream frames. HttpDecoder decoder_; + // TODO(b/112770235): Remove once blocked decoding is implemented + // and can be tested with delayed encoder stream data. + bool pretend_blocked_decoding_for_tests_; // Headers accumulator for decoding HEADERS frame payload. std::unique_ptr<QpackDecodedHeadersAccumulator> qpack_decoded_headers_accumulator_;
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc index bab9228..b50c3bb 100644 --- a/quic/core/http/quic_spdy_stream_test.cc +++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -1810,6 +1810,146 @@ stream_->OnStreamFrame(frame); } +TEST_P(QuicSpdyStreamTest, BlockedHeaderDecoding) { + if (!VersionUsesQpack(GetParam().transport_version)) { + return; + } + + Initialize(kShouldProcessData); + QuicSpdyStreamPeer::pretend_blocked_decoding(stream_); + + // HEADERS frame with QPACK encoded single header field "foo: bar". + std::string headers = + HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172")); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers)); + + // Even though entire header block is received, it cannot be decoded. + EXPECT_FALSE(stream_->headers_decompressed()); + + // Signal to QpackDecodedHeadersAccumulator that the header block has been + // decoded. (OnDecodingCompleted() has already been called by the QPACK + // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called + // twice, which is a no-op.) + QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_) + ->OnDecodingCompleted(); + + EXPECT_TRUE(stream_->headers_decompressed()); + + EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar"))); + stream_->ConsumeHeaderList(); + + std::string data = DataFrame(kDataFramePayload); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */ + headers.length(), data)); + EXPECT_EQ(kDataFramePayload, stream_->data()); + + // Trailing HEADERS frame with QPACK encoded + // single header field "custom-key: custom-value". + std::string trailers = HeadersFrame( + QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf")); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */ + headers.length() + data.length(), + trailers)); + + // Even though entire header block is received, it cannot be decoded. + EXPECT_FALSE(stream_->trailers_decompressed()); + + // Signal to QpackDecodedHeadersAccumulator that the header block has been + // decoded. + QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_) + ->OnDecodingCompleted(); + EXPECT_TRUE(stream_->trailers_decompressed()); + + // Verify trailers. + EXPECT_THAT(stream_->received_trailers(), + ElementsAre(Pair("custom-key", "custom-value"))); + stream_->MarkTrailersConsumed(); +} + +TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingHeaders) { + if (!VersionUsesQpack(GetParam().transport_version)) { + return; + } + + Initialize(kShouldProcessData); + QuicSpdyStreamPeer::pretend_blocked_decoding(stream_); + + // HEADERS frame with QPACK encoded single header field "foo: bar". + std::string headers = + HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172")); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers)); + + // Even though entire header block is received, it cannot be decoded. + EXPECT_FALSE(stream_->headers_decompressed()); + + // Signal a decoding error to QpackDecodedHeadersAccumulator. + std::string expected_error_message = + QuicStrCat("Error during async decoding of headers on stream ", + stream_->id(), ": unexpected error"); + EXPECT_CALL( + *connection_, + CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message, + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET)); + QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_) + ->OnDecodingErrorDetected("unexpected error"); +} + +TEST_P(QuicSpdyStreamTest, AsyncErrorDecodingTrailers) { + if (!VersionUsesQpack(GetParam().transport_version)) { + return; + } + + Initialize(kShouldProcessData); + QuicSpdyStreamPeer::pretend_blocked_decoding(stream_); + + // HEADERS frame with QPACK encoded single header field "foo: bar". + std::string headers = + HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172")); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, 0, headers)); + + // Even though entire header block is received, it cannot be decoded. + EXPECT_FALSE(stream_->headers_decompressed()); + + // Signal to QpackDecodedHeadersAccumulator that the header block has been + // decoded. (OnDecodingCompleted() has already been called by the QPACK + // decoder, so technically quic_header_list_.OnHeaderBlockEnd() is called + // twice, which is a no-op.) + QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_) + ->OnDecodingCompleted(); + + EXPECT_TRUE(stream_->headers_decompressed()); + + EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar"))); + stream_->ConsumeHeaderList(); + + std::string data = DataFrame(kDataFramePayload); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), false, /* offset = */ + headers.length(), data)); + EXPECT_EQ(kDataFramePayload, stream_->data()); + + // Trailing HEADERS frame with QPACK encoded + // single header field "custom-key: custom-value". + std::string trailers = HeadersFrame( + QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf")); + stream_->OnStreamFrame(QuicStreamFrame(stream_->id(), true, /* offset = */ + headers.length() + data.length(), + trailers)); + + // Even though entire header block is received, it cannot be decoded. + EXPECT_FALSE(stream_->trailers_decompressed()); + + // Signal a decoding error to QpackDecodedHeadersAccumulator. + std::string expected_error_message = + QuicStrCat("Error during async decoding of trailers on stream ", + stream_->id(), ": unexpected error"); + EXPECT_CALL( + *connection_, + CloseConnection(QUIC_DECOMPRESSION_FAILURE, expected_error_message, + ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET)); + QuicSpdyStreamPeer::qpack_decoded_headers_accumulator(stream_) + ->OnDecodingErrorDetected("unexpected error"); +} + class QuicSpdyStreamIncrementalConsumptionTest : public QuicSpdyStreamTest { protected: QuicSpdyStreamIncrementalConsumptionTest() : offset_(0), consumed_bytes_(0) {}