blob: fa7684106abb6ae6e3056988bdc3f25a5fc8615e [file] [log] [blame]
Bence Békybac04052022-04-07 15:44:29 -04001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "quiche/quic/core/quic_stream_sequencer.h"
6
7#include <algorithm>
8#include <cstddef>
9#include <limits>
10#include <string>
11#include <utility>
12
13#include "absl/strings/str_cat.h"
14#include "absl/strings/string_view.h"
15#include "quiche/quic/core/quic_clock.h"
16#include "quiche/quic/core/quic_error_codes.h"
17#include "quiche/quic/core/quic_packets.h"
18#include "quiche/quic/core/quic_stream.h"
19#include "quiche/quic/core/quic_stream_sequencer_buffer.h"
20#include "quiche/quic/core/quic_types.h"
21#include "quiche/quic/core/quic_utils.h"
22#include "quiche/quic/platform/api/quic_bug_tracker.h"
23#include "quiche/quic/platform/api/quic_flag_utils.h"
24#include "quiche/quic/platform/api/quic_flags.h"
25#include "quiche/quic/platform/api/quic_logging.h"
26#include "quiche/quic/platform/api/quic_stack_trace.h"
27
28namespace quic {
29
30QuicStreamSequencer::QuicStreamSequencer(StreamInterface* quic_stream)
31 : stream_(quic_stream),
32 buffered_frames_(kStreamReceiveWindowLimit),
33 highest_offset_(0),
34 close_offset_(std::numeric_limits<QuicStreamOffset>::max()),
35 blocked_(false),
36 num_frames_received_(0),
37 num_duplicate_frames_received_(0),
38 ignore_read_data_(false),
39 level_triggered_(false) {}
40
41QuicStreamSequencer::~QuicStreamSequencer() {
42 if (stream_ == nullptr) {
43 QUIC_BUG(quic_bug_10858_1) << "Double free'ing QuicStreamSequencer at "
44 << this << ". " << QuicStackTrace();
45 }
46 stream_ = nullptr;
47}
48
49void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
50 QUICHE_DCHECK_LE(frame.offset + frame.data_length, close_offset_);
51 ++num_frames_received_;
52 const QuicStreamOffset byte_offset = frame.offset;
53 const size_t data_len = frame.data_length;
54
55 if (frame.fin &&
56 (!CloseStreamAtOffset(frame.offset + data_len) || data_len == 0)) {
57 return;
58 }
59 if (stream_->version().HasIetfQuicFrames() && data_len == 0) {
60 QUICHE_DCHECK(!frame.fin);
61 // Ignore empty frame with no fin.
62 return;
63 }
64 OnFrameData(byte_offset, data_len, frame.data_buffer);
65}
66
67void QuicStreamSequencer::OnCryptoFrame(const QuicCryptoFrame& frame) {
68 ++num_frames_received_;
69 if (frame.data_length == 0) {
70 // Ignore empty crypto frame.
71 return;
72 }
73 OnFrameData(frame.offset, frame.data_length, frame.data_buffer);
74}
75
76void QuicStreamSequencer::OnFrameData(QuicStreamOffset byte_offset,
77 size_t data_len,
78 const char* data_buffer) {
79 highest_offset_ = std::max(highest_offset_, byte_offset + data_len);
80 const size_t previous_readable_bytes = buffered_frames_.ReadableBytes();
81 size_t bytes_written;
82 std::string error_details;
83 QuicErrorCode result = buffered_frames_.OnStreamData(
84 byte_offset, absl::string_view(data_buffer, data_len), &bytes_written,
85 &error_details);
86 if (result != QUIC_NO_ERROR) {
87 std::string details =
88 absl::StrCat("Stream ", stream_->id(), ": ",
89 QuicErrorCodeToString(result), ": ", error_details);
90 QUIC_LOG_FIRST_N(WARNING, 50) << QuicErrorCodeToString(result);
91 QUIC_LOG_FIRST_N(WARNING, 50) << details;
92 stream_->OnUnrecoverableError(result, details);
93 return;
94 }
95
96 if (bytes_written == 0) {
97 ++num_duplicate_frames_received_;
98 // Silently ignore duplicates.
99 return;
100 }
101
102 if (blocked_) {
103 return;
104 }
105
106 if (level_triggered_) {
107 if (buffered_frames_.ReadableBytes() > previous_readable_bytes) {
108 // Readable bytes has changed, let stream decide if to inform application
109 // or not.
110 if (ignore_read_data_) {
111 FlushBufferedFrames();
112 } else {
113 stream_->OnDataAvailable();
114 }
115 }
116 return;
117 }
118 const bool stream_unblocked =
119 previous_readable_bytes == 0 && buffered_frames_.ReadableBytes() > 0;
120 if (stream_unblocked) {
121 if (ignore_read_data_) {
122 FlushBufferedFrames();
123 } else {
124 stream_->OnDataAvailable();
125 }
126 }
127}
128
129bool QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
130 const QuicStreamOffset kMaxOffset =
131 std::numeric_limits<QuicStreamOffset>::max();
132
133 // If there is a scheduled close, the new offset should match it.
134 if (close_offset_ != kMaxOffset && offset != close_offset_) {
135 stream_->OnUnrecoverableError(
136 QUIC_STREAM_SEQUENCER_INVALID_STATE,
137 absl::StrCat(
138 "Stream ", stream_->id(), " received new final offset: ", offset,
139 ", which is different from close offset: ", close_offset_));
140 return false;
141 }
142
143 // The final offset should be no less than the highest offset that is
144 // received.
145 if (offset < highest_offset_) {
146 stream_->OnUnrecoverableError(
147 QUIC_STREAM_SEQUENCER_INVALID_STATE,
148 absl::StrCat(
149 "Stream ", stream_->id(), " received fin with offset: ", offset,
150 ", which reduces current highest offset: ", highest_offset_));
151 return false;
152 }
153
154 close_offset_ = offset;
155
156 MaybeCloseStream();
157 return true;
158}
159
160void QuicStreamSequencer::MaybeCloseStream() {
161 if (blocked_ || !IsClosed()) {
162 return;
163 }
164
165 QUIC_DVLOG(1) << "Passing up termination, as we've processed "
166 << buffered_frames_.BytesConsumed() << " of " << close_offset_
167 << " bytes.";
168 // This will cause the stream to consume the FIN.
169 // Technically it's an error if |num_bytes_consumed| isn't exactly
170 // equal to |close_offset|, but error handling seems silly at this point.
171 if (ignore_read_data_) {
172 // The sequencer is discarding stream data and must notify the stream on
173 // receipt of a FIN because the consumer won't.
174 stream_->OnFinRead();
175 } else {
176 stream_->OnDataAvailable();
177 }
178 buffered_frames_.Clear();
179}
180
181int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
182 QUICHE_DCHECK(!blocked_);
183 return buffered_frames_.GetReadableRegions(iov, iov_len);
184}
185
186bool QuicStreamSequencer::GetReadableRegion(iovec* iov) const {
187 QUICHE_DCHECK(!blocked_);
188 return buffered_frames_.GetReadableRegion(iov);
189}
190
191bool QuicStreamSequencer::PeekRegion(QuicStreamOffset offset,
192 iovec* iov) const {
193 QUICHE_DCHECK(!blocked_);
194 return buffered_frames_.PeekRegion(offset, iov);
195}
196
197void QuicStreamSequencer::Read(std::string* buffer) {
198 QUICHE_DCHECK(!blocked_);
199 buffer->resize(buffer->size() + ReadableBytes());
200 iovec iov;
201 iov.iov_len = ReadableBytes();
202 iov.iov_base = &(*buffer)[buffer->size() - iov.iov_len];
203 Readv(&iov, 1);
204}
205
206size_t QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
207 QUICHE_DCHECK(!blocked_);
208 std::string error_details;
209 size_t bytes_read;
210 QuicErrorCode read_error =
211 buffered_frames_.Readv(iov, iov_len, &bytes_read, &error_details);
212 if (read_error != QUIC_NO_ERROR) {
213 std::string details =
214 absl::StrCat("Stream ", stream_->id(), ": ", error_details);
215 stream_->OnUnrecoverableError(read_error, details);
216 return bytes_read;
217 }
218
219 stream_->AddBytesConsumed(bytes_read);
220 return bytes_read;
221}
222
223bool QuicStreamSequencer::HasBytesToRead() const {
224 return buffered_frames_.HasBytesToRead();
225}
226
227size_t QuicStreamSequencer::ReadableBytes() const {
228 return buffered_frames_.ReadableBytes();
229}
230
231bool QuicStreamSequencer::IsClosed() const {
232 return buffered_frames_.BytesConsumed() >= close_offset_;
233}
234
235void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
236 QUICHE_DCHECK(!blocked_);
237 bool result = buffered_frames_.MarkConsumed(num_bytes_consumed);
238 if (!result) {
239 QUIC_BUG(quic_bug_10858_2)
240 << "Invalid argument to MarkConsumed."
241 << " expect to consume: " << num_bytes_consumed
242 << ", but not enough bytes available. " << DebugString();
243 stream_->ResetWithError(
244 QuicResetStreamError::FromInternal(QUIC_ERROR_PROCESSING_STREAM));
245 return;
246 }
247 stream_->AddBytesConsumed(num_bytes_consumed);
248}
249
bnc862751f2022-04-13 08:33:42 -0700250void QuicStreamSequencer::SetBlockedUntilFlush() { blocked_ = true; }
Bence Békybac04052022-04-07 15:44:29 -0400251
252void QuicStreamSequencer::SetUnblocked() {
253 blocked_ = false;
254 if (IsClosed() || HasBytesToRead()) {
255 stream_->OnDataAvailable();
256 }
257}
258
259void QuicStreamSequencer::StopReading() {
260 if (ignore_read_data_) {
261 return;
262 }
263 ignore_read_data_ = true;
264 FlushBufferedFrames();
265}
266
267void QuicStreamSequencer::ReleaseBuffer() {
268 buffered_frames_.ReleaseWholeBuffer();
269}
270
271void QuicStreamSequencer::ReleaseBufferIfEmpty() {
272 if (buffered_frames_.Empty()) {
273 buffered_frames_.ReleaseWholeBuffer();
274 }
275}
276
277void QuicStreamSequencer::FlushBufferedFrames() {
278 QUICHE_DCHECK(ignore_read_data_);
279 size_t bytes_flushed = buffered_frames_.FlushBufferedFrames();
280 QUIC_DVLOG(1) << "Flushing buffered data at offset "
281 << buffered_frames_.BytesConsumed() << " length "
282 << bytes_flushed << " for stream " << stream_->id();
283 stream_->AddBytesConsumed(bytes_flushed);
284 MaybeCloseStream();
285}
286
287size_t QuicStreamSequencer::NumBytesBuffered() const {
288 return buffered_frames_.BytesBuffered();
289}
290
291QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const {
292 return buffered_frames_.BytesConsumed();
293}
294
vasilvv07d94482022-06-02 12:37:18 -0700295bool QuicStreamSequencer::IsAllDataAvailable() const {
296 QUICHE_DCHECK_LE(NumBytesConsumed() + NumBytesBuffered(), close_offset_);
297 return NumBytesConsumed() + NumBytesBuffered() >= close_offset_;
298}
299
Bence Békybac04052022-04-07 15:44:29 -0400300const std::string QuicStreamSequencer::DebugString() const {
301 // clang-format off
302 return absl::StrCat("QuicStreamSequencer:",
303 "\n bytes buffered: ", NumBytesBuffered(),
304 "\n bytes consumed: ", NumBytesConsumed(),
305 "\n has bytes to read: ", HasBytesToRead() ? "true" : "false",
306 "\n frames received: ", num_frames_received(),
307 "\n close offset bytes: ", close_offset_,
308 "\n is closed: ", IsClosed() ? "true" : "false");
309 // clang-format on
310}
311
312} // namespace quic