Call MarkConsumed() on QPACK bytes incrementally, as soon as possible.
QPACK data is already processed incrementally, but calling
QuicStreamSequencer::MarkConsumed() is postponed until headers or trailers are
consumed by the higher layer as signalled by a ConsumeHeaderList() or
MarkTrailersConsumed() call. This conflicts with test
ResponseProcessingTest.CookieRequestHeaderExcessivelyLong that had to be
disabled at b/249121660.
This CL makes sure MarkConsumed() is called for every fragment of HEADERS frames
immediately, every time for headers, and for trailers only if data bytes are all
consumed (otherwise the sequencer starts freeing up data bytes). And if not all data
bytes are consumed, then MarkConsumed() is called for trailer bytes as soon as
possible.
gfe-relnote: n/a; QUIC v99 only.
PiperOrigin-RevId: 252430032
Change-Id: I1aa005971a3de72ff536bb54a35817b8fd56a68e
diff --git a/quic/core/http/quic_spdy_stream_test.cc b/quic/core/http/quic_spdy_stream_test.cc
index 00a5dba..9ca1df2 100644
--- a/quic/core/http/quic_spdy_stream_test.cc
+++ b/quic/core/http/quic_spdy_stream_test.cc
@@ -35,7 +35,9 @@
using spdy::SpdyPriority;
using testing::_;
using testing::AtLeast;
+using testing::ElementsAre;
using testing::Invoke;
+using testing::Pair;
using testing::Return;
using testing::StrictMock;
@@ -90,6 +92,11 @@
const std::string& data() const { return data_; }
const spdy::SpdyHeaderBlock& saved_headers() const { return saved_headers_; }
+ // Expose protected accessor.
+ const QuicStreamSequencer* sequencer() const {
+ return QuicStream::sequencer();
+ }
+
private:
bool should_process_data_;
spdy::SpdyHeaderBlock saved_headers_;
@@ -149,6 +156,8 @@
"JBCScs_ejbKaqBDoB7ZGxTvqlrB__2ZmnHHjCr8RgMRtKNtIeuZAo ";
}
+ ~QuicSpdyStreamTest() override = default;
+
std::string EncodeQpackHeaders(QuicStreamId id, SpdyHeaderBlock* header) {
auto qpack_encoder =
QuicMakeUnique<QpackEncoder>(session_.get(), session_.get());
@@ -1098,6 +1107,13 @@
// If headers are received with a FIN, no trailers should then arrive.
Initialize(kShouldProcessData);
+ // If HEADERS frames are sent on the request/response stream, then the
+ // sequencer will signal an error if any stream data arrives after a FIN,
+ // so QuicSpdyStream does not need to.
+ if (VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
// Receive initial headers with FIN set.
ProcessHeaders(true, headers_);
stream_->ConsumeHeaderList();
@@ -1733,12 +1749,7 @@
QuicStreamFrame frame(stream_->id(), false, 0, stream_frame_payload);
stream_->OnStreamFrame(frame);
- auto it = stream_->header_list().begin();
- ASSERT_TRUE(it != stream_->header_list().end());
- EXPECT_EQ("foo", it->first);
- EXPECT_EQ("bar", it->second);
- ++it;
- EXPECT_TRUE(it == stream_->header_list().end());
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
// QuicSpdyStream only calls OnBodyAvailable()
// after the header list has been consumed.
@@ -1746,8 +1757,8 @@
stream_->ConsumeHeaderList();
EXPECT_EQ(kDataFramePayload, stream_->data());
- EXPECT_THAT(stream_->received_trailers(), testing::ElementsAre(testing::Pair(
- "custom-key", "custom-value")));
+ EXPECT_THAT(stream_->received_trailers(),
+ ElementsAre(Pair("custom-key", "custom-value")));
}
TEST_P(QuicSpdyStreamTest, ProcessBodyAfterTrailers) {
@@ -1829,6 +1840,116 @@
stream_->OnStreamFrame(frame);
}
+class QuicSpdyStreamIncrementalConsumptionTest : public QuicSpdyStreamTest {
+ protected:
+ QuicSpdyStreamIncrementalConsumptionTest() : offset_(0), consumed_bytes_(0) {}
+ ~QuicSpdyStreamIncrementalConsumptionTest() override = default;
+
+ // Create QuicStreamFrame with |payload|
+ // and pass it to stream_->OnStreamFrame().
+ void OnStreamFrame(QuicStringPiece payload) {
+ QuicStreamFrame frame(stream_->id(), /* fin = */ false, offset_, payload);
+ stream_->OnStreamFrame(frame);
+ offset_ += payload.size();
+ }
+
+ // Return number of bytes marked consumed with sequencer
+ // since last NewlyConsumedBytes() call.
+ QuicStreamOffset NewlyConsumedBytes() {
+ QuicStreamOffset previously_consumed_bytes = consumed_bytes_;
+ consumed_bytes_ = stream_->sequencer()->NumBytesConsumed();
+ return consumed_bytes_ - previously_consumed_bytes;
+ }
+
+ // Read |size| bytes from the stream.
+ std::string ReadFromStream(QuicByteCount size) {
+ std::string buffer;
+ buffer.resize(size);
+
+ struct iovec vec;
+ vec.iov_base = const_cast<char*>(buffer.data());
+ vec.iov_len = size;
+
+ size_t bytes_read = stream_->Readv(&vec, 1);
+ EXPECT_EQ(bytes_read, size);
+
+ return buffer;
+ }
+
+ private:
+ QuicStreamOffset offset_;
+ QuicStreamOffset consumed_bytes_;
+};
+
+INSTANTIATE_TEST_SUITE_P(Tests,
+ QuicSpdyStreamIncrementalConsumptionTest,
+ ::testing::Values(ParsedQuicVersion{PROTOCOL_TLS1_3,
+ QUIC_VERSION_99}));
+
+// Test that stream bytes are consumed (by calling
+// sequencer()->MarkConsumed()) incrementally, as soon as possible.
+TEST_P(QuicSpdyStreamIncrementalConsumptionTest, IncrementalConsumptionTest) {
+ if (!VersionUsesQpack(GetParam().transport_version)) {
+ return;
+ }
+
+ Initialize(!kShouldProcessData);
+
+ // HEADERS frame with QPACK encoded single header field "foo: bar".
+ std::string headers =
+ HeadersFrame(QuicTextUtils::HexDecode("00002a94e703626172"));
+
+ // All HEADERS frame bytes are consumed even if the frame is not received
+ // completely (as long as at least some of the payload is received, which is
+ // an implementation detail that should not be tested).
+ OnStreamFrame(QuicStringPiece(headers).substr(0, headers.size() - 1));
+ EXPECT_EQ(headers.size() - 1, NewlyConsumedBytes());
+
+ // The rest of the HEADERS frame is also consumed immediately.
+ OnStreamFrame(QuicStringPiece(headers).substr(headers.size() - 1));
+ EXPECT_EQ(1u, NewlyConsumedBytes());
+
+ // Verify headers.
+ EXPECT_THAT(stream_->header_list(), ElementsAre(Pair("foo", "bar")));
+ stream_->ConsumeHeaderList();
+
+ // DATA frame.
+ QuicStringPiece data_payload(kDataFramePayload);
+ std::string data_frame = DataFrame(data_payload);
+
+ // DATA frame is not consumed because payload has to be buffered.
+ // TODO(bnc): Consume frame header as soon as possible.
+ OnStreamFrame(data_frame);
+ EXPECT_EQ(0u, NewlyConsumedBytes());
+
+ // Consume all but last byte of data.
+ EXPECT_EQ(data_payload.substr(0, data_payload.size() - 1),
+ ReadFromStream(data_payload.size() - 1));
+ EXPECT_EQ(data_frame.size() - 1, NewlyConsumedBytes());
+
+ // Trailing HEADERS frame with QPACK encoded
+ // single header field "custom-key: custom-value".
+ std::string trailers = HeadersFrame(
+ QuicTextUtils::HexDecode("00002f0125a849e95ba97d7f8925a849e95bb8e8b4bf"));
+
+ // No bytes are consumed, because last byte of DATA payload is still buffered.
+ OnStreamFrame(QuicStringPiece(trailers).substr(0, trailers.size() - 1));
+ EXPECT_EQ(0u, NewlyConsumedBytes());
+
+ // Reading last byte of DATA payload triggers consumption of all data received
+ // so far, even though last HEADERS frame has not been received completely.
+ EXPECT_EQ(data_payload.substr(data_payload.size() - 1), ReadFromStream(1));
+ EXPECT_EQ(1 + trailers.size() - 1, NewlyConsumedBytes());
+
+ // Last byte of trailers is immediately consumed.
+ OnStreamFrame(QuicStringPiece(trailers).substr(trailers.size() - 1));
+ EXPECT_EQ(1u, NewlyConsumedBytes());
+
+ // Verify trailers.
+ EXPECT_THAT(stream_->received_trailers(),
+ ElementsAre(Pair("custom-key", "custom-value")));
+}
+
} // namespace
} // namespace test
} // namespace quic