blob: b4878bf5295937c402379ea8b0561bfaf2f8a825 [file] [log] [blame]
Bence Békybac04052022-04-07 15:44:29 -04001// Copyright (c) 2015 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "quiche/quic/core/quic_stream_sequencer_buffer.h"
6
7#include <algorithm>
8#include <cstddef>
9#include <memory>
10#include <string>
QUICHE teamad0dbe62024-05-21 11:27:01 -070011#include <utility>
Bence Békybac04052022-04-07 15:44:29 -040012
13#include "absl/strings/str_cat.h"
14#include "absl/strings/string_view.h"
15#include "quiche/quic/core/quic_constants.h"
16#include "quiche/quic/core/quic_interval.h"
17#include "quiche/quic/platform/api/quic_bug_tracker.h"
18#include "quiche/quic/platform/api/quic_flag_utils.h"
19#include "quiche/quic/platform/api/quic_flags.h"
20#include "quiche/quic/platform/api/quic_logging.h"
21
22namespace quic {
23namespace {
24
25size_t CalculateBlockCount(size_t max_capacity_bytes) {
26 return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
27 QuicStreamSequencerBuffer::kBlockSizeBytes;
28}
29
30// Upper limit of how many gaps allowed in buffer, which ensures a reasonable
31// number of iterations needed to find the right gap to fill when a frame
32// arrives.
33const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
34
35// Number of blocks allocated initially.
36constexpr size_t kInitialBlockCount = 8u;
37
38// How fast block pointers container grow in size.
39// Choose 4 to reduce the amount of reallocation.
40constexpr int kBlocksGrowthFactor = 4;
41
42} // namespace
43
44QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
45 : max_buffer_capacity_bytes_(max_capacity_bytes),
46 max_blocks_count_(CalculateBlockCount(max_capacity_bytes)),
47 current_blocks_count_(0u),
48 total_bytes_read_(0),
49 blocks_(nullptr) {
50 QUICHE_DCHECK_GE(max_blocks_count_, kInitialBlockCount);
51 Clear();
52}
53
bnc862751f2022-04-13 08:33:42 -070054QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() { Clear(); }
Bence Békybac04052022-04-07 15:44:29 -040055
56void QuicStreamSequencerBuffer::Clear() {
57 if (blocks_ != nullptr) {
58 for (size_t i = 0; i < current_blocks_count_; ++i) {
59 if (blocks_[i] != nullptr) {
60 RetireBlock(i);
61 }
62 }
63 }
64 num_bytes_buffered_ = 0;
65 bytes_received_.Clear();
66 bytes_received_.Add(0, total_bytes_read_);
67}
68
69bool QuicStreamSequencerBuffer::RetireBlock(size_t index) {
70 if (blocks_[index] == nullptr) {
71 QUIC_BUG(quic_bug_10610_1) << "Try to retire block twice";
72 return false;
73 }
74 delete blocks_[index];
75 blocks_[index] = nullptr;
76 QUIC_DVLOG(1) << "Retired block with index: " << index;
77 return true;
78}
79
80void QuicStreamSequencerBuffer::MaybeAddMoreBlocks(
81 QuicStreamOffset next_expected_byte) {
82 if (current_blocks_count_ == max_blocks_count_) {
83 return;
84 }
85 QuicStreamOffset last_byte = next_expected_byte - 1;
86 size_t num_of_blocks_needed;
87 // As long as last_byte does not wrap around, its index plus one blocks are
88 // needed. Otherwise, block_count_ blocks are needed.
89 if (last_byte < max_buffer_capacity_bytes_) {
90 num_of_blocks_needed =
91 std::max(GetBlockIndex(last_byte) + 1, kInitialBlockCount);
92 } else {
93 num_of_blocks_needed = max_blocks_count_;
94 }
95 if (current_blocks_count_ >= num_of_blocks_needed) {
96 return;
97 }
98 size_t new_block_count = kBlocksGrowthFactor * current_blocks_count_;
99 new_block_count = std::min(std::max(new_block_count, num_of_blocks_needed),
100 max_blocks_count_);
101 auto new_blocks = std::make_unique<BufferBlock*[]>(new_block_count);
102 if (blocks_ != nullptr) {
103 memcpy(new_blocks.get(), blocks_.get(),
104 current_blocks_count_ * sizeof(BufferBlock*));
105 }
106 blocks_ = std::move(new_blocks);
107 current_blocks_count_ = new_block_count;
108}
109
110QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
bnc862751f2022-04-13 08:33:42 -0700111 QuicStreamOffset starting_offset, absl::string_view data,
112 size_t* const bytes_buffered, std::string* error_details) {
Bence Békybac04052022-04-07 15:44:29 -0400113 *bytes_buffered = 0;
114 size_t size = data.size();
115 if (size == 0) {
116 *error_details = "Received empty stream frame without FIN.";
117 return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
118 }
119 // Write beyond the current range this buffer is covering.
120 if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
121 starting_offset + size < starting_offset) {
122 *error_details = "Received data beyond available range.";
123 return QUIC_INTERNAL_ERROR;
124 }
125
126 if (bytes_received_.Empty() ||
127 starting_offset >= bytes_received_.rbegin()->max() ||
128 bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
129 starting_offset, starting_offset + size))) {
130 // Optimization for the typical case, when all data is newly received.
131 bytes_received_.AddOptimizedForAppend(starting_offset,
132 starting_offset + size);
133 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
134 // This frame is going to create more intervals than allowed. Stop
135 // processing.
136 *error_details = "Too many data intervals received for this stream.";
137 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
138 }
139 MaybeAddMoreBlocks(starting_offset + size);
140
141 size_t bytes_copy = 0;
142 if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
143 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
144 }
145 *bytes_buffered += bytes_copy;
146 num_bytes_buffered_ += *bytes_buffered;
147 return QUIC_NO_ERROR;
148 }
149 // Slow path, received data overlaps with received data.
150 QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
151 starting_offset + size);
152 newly_received.Difference(bytes_received_);
153 if (newly_received.Empty()) {
154 return QUIC_NO_ERROR;
155 }
156 bytes_received_.Add(starting_offset, starting_offset + size);
157 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
158 // This frame is going to create more intervals than allowed. Stop
159 // processing.
160 *error_details = "Too many data intervals received for this stream.";
161 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
162 }
163 MaybeAddMoreBlocks(starting_offset + size);
164 for (const auto& interval : newly_received) {
165 const QuicStreamOffset copy_offset = interval.min();
166 const QuicByteCount copy_length = interval.max() - interval.min();
167 size_t bytes_copy = 0;
168 if (!CopyStreamData(copy_offset,
169 data.substr(copy_offset - starting_offset, copy_length),
170 &bytes_copy, error_details)) {
171 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
172 }
173 *bytes_buffered += bytes_copy;
174 }
175 num_bytes_buffered_ += *bytes_buffered;
176 return QUIC_NO_ERROR;
177}
178
179bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
180 absl::string_view data,
181 size_t* bytes_copy,
182 std::string* error_details) {
183 *bytes_copy = 0;
184 size_t source_remaining = data.size();
185 if (source_remaining == 0) {
186 return true;
187 }
188 const char* source = data.data();
189 // Write data block by block. If corresponding block has not created yet,
190 // create it first.
191 // Stop when all data are written or reaches the logical end of the buffer.
192 while (source_remaining > 0) {
193 const size_t write_block_num = GetBlockIndex(offset);
194 const size_t write_block_offset = GetInBlockOffset(offset);
195 size_t current_blocks_count = current_blocks_count_;
196 QUICHE_DCHECK_GT(current_blocks_count, write_block_num);
197
198 size_t block_capacity = GetBlockCapacity(write_block_num);
199 size_t bytes_avail = block_capacity - write_block_offset;
200
201 // If this write meets the upper boundary of the buffer,
202 // reduce the available free bytes.
203 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
204 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
205 }
206
207 if (write_block_num >= current_blocks_count) {
208 *error_details = absl::StrCat(
209 "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
210 "write offset = ",
211 offset, " write_block_num = ", write_block_num,
212 " current_blocks_count_ = ", current_blocks_count);
213 return false;
214 }
215 if (blocks_ == nullptr) {
216 *error_details =
217 "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
218 return false;
219 }
220 if (blocks_[write_block_num] == nullptr) {
221 // TODO(danzh): Investigate if using a freelist would improve performance.
222 // Same as RetireBlock().
223 blocks_[write_block_num] = new BufferBlock();
224 }
225
226 const size_t bytes_to_copy =
227 std::min<size_t>(bytes_avail, source_remaining);
228 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
229 QUIC_DVLOG(1) << "Write at offset: " << offset
230 << " length: " << bytes_to_copy;
231
232 if (dest == nullptr || source == nullptr) {
233 *error_details = absl::StrCat(
234 "QuicStreamSequencerBuffer error: OnStreamData()"
235 " dest == nullptr: ",
236 (dest == nullptr), " source == nullptr: ", (source == nullptr),
237 " Writing at offset ", offset,
238 " Received frames: ", ReceivedFramesDebugString(),
239 " total_bytes_read_ = ", total_bytes_read_);
240 return false;
241 }
242 memcpy(dest, source, bytes_to_copy);
243 source += bytes_to_copy;
244 source_remaining -= bytes_to_copy;
245 offset += bytes_to_copy;
246 *bytes_copy += bytes_to_copy;
247 }
248 return true;
249}
250
251QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
252 size_t dest_count,
253 size_t* bytes_read,
254 std::string* error_details) {
255 *bytes_read = 0;
256 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
257 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
258 QUICHE_DCHECK(dest != nullptr);
259 size_t dest_remaining = dest_iov[i].iov_len;
260 while (dest_remaining > 0 && ReadableBytes() > 0) {
261 size_t block_idx = NextBlockToRead();
262 size_t start_offset_in_block = ReadOffset();
263 size_t block_capacity = GetBlockCapacity(block_idx);
264 size_t bytes_available_in_block = std::min<size_t>(
265 ReadableBytes(), block_capacity - start_offset_in_block);
266 size_t bytes_to_copy =
267 std::min<size_t>(bytes_available_in_block, dest_remaining);
268 QUICHE_DCHECK_GT(bytes_to_copy, 0u);
269 if (blocks_[block_idx] == nullptr || dest == nullptr) {
270 *error_details = absl::StrCat(
271 "QuicStreamSequencerBuffer error:"
272 " Readv() dest == nullptr: ",
273 (dest == nullptr), " blocks_[", block_idx,
274 "] == nullptr: ", (blocks_[block_idx] == nullptr),
275 " Received frames: ", ReceivedFramesDebugString(),
276 " total_bytes_read_ = ", total_bytes_read_);
277 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
278 }
279 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
280 bytes_to_copy);
281 dest += bytes_to_copy;
282 dest_remaining -= bytes_to_copy;
283 num_bytes_buffered_ -= bytes_to_copy;
284 total_bytes_read_ += bytes_to_copy;
285 *bytes_read += bytes_to_copy;
286
287 // Retire the block if all the data is read out and no other data is
288 // stored in this block.
289 // In case of failing to retire a block which is ready to retire, return
290 // immediately.
291 if (bytes_to_copy == bytes_available_in_block) {
292 bool retire_successfully = RetireBlockIfEmpty(block_idx);
293 if (!retire_successfully) {
294 *error_details = absl::StrCat(
295 "QuicStreamSequencerBuffer error: fail to retire block ",
296 block_idx,
297 " as the block is already released, total_bytes_read_ = ",
298 total_bytes_read_,
299 " Received frames: ", ReceivedFramesDebugString());
300 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
301 }
302 }
303 }
304 }
305
306 return QUIC_NO_ERROR;
307}
308
309int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
310 int iov_len) const {
311 QUICHE_DCHECK(iov != nullptr);
312 QUICHE_DCHECK_GT(iov_len, 0);
313
314 if (ReadableBytes() == 0) {
315 iov[0].iov_base = nullptr;
316 iov[0].iov_len = 0;
317 return 0;
318 }
319
320 size_t start_block_idx = NextBlockToRead();
321 QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
322 QUICHE_DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
323 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
324 size_t end_block_idx = GetBlockIndex(readable_offset_end);
325
326 // If readable region is within one block, deal with it seperately.
327 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
328 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
329 iov[0].iov_len = ReadableBytes();
330 QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
331 return 1;
332 }
333
334 // Get first block
335 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
336 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
337 QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
338 << iov[0].iov_len;
339 QUICHE_DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
340 << "there should be more available data";
341
342 // Get readable regions of the rest blocks till either 2nd to last block
343 // before gap is met or |iov| is filled. For these blocks, one whole block is
344 // a region.
345 int iov_used = 1;
346 size_t block_idx = (start_block_idx + iov_used) % max_blocks_count_;
347 while (block_idx != end_block_idx && iov_used < iov_len) {
348 QUICHE_DCHECK(nullptr != blocks_[block_idx]);
349 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
350 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
351 QUIC_DVLOG(1) << "Got block with index: " << block_idx;
352 ++iov_used;
353 block_idx = (start_block_idx + iov_used) % max_blocks_count_;
354 }
355
356 // Deal with last block if |iov| can hold more.
357 if (iov_used < iov_len) {
358 QUICHE_DCHECK(nullptr != blocks_[block_idx]);
359 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
360 iov[iov_used].iov_len = end_block_offset + 1;
361 QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
362 ++iov_used;
363 }
364 return iov_used;
365}
366
367bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
368 return GetReadableRegions(iov, 1) == 1;
369}
370
371bool QuicStreamSequencerBuffer::PeekRegion(QuicStreamOffset offset,
372 iovec* iov) const {
373 QUICHE_DCHECK(iov);
374
375 if (offset < total_bytes_read_) {
376 // Data at |offset| has already been consumed.
377 return false;
378 }
379
380 if (offset >= FirstMissingByte()) {
381 // Data at |offset| has not been received yet.
382 return false;
383 }
384
385 // Beginning of region.
386 size_t block_idx = GetBlockIndex(offset);
387 size_t block_offset = GetInBlockOffset(offset);
388 iov->iov_base = blocks_[block_idx]->buffer + block_offset;
389
390 // Determine if entire block has been received.
391 size_t end_block_idx = GetBlockIndex(FirstMissingByte());
danzh6158f312024-07-18 10:57:22 -0700392 if (block_idx == end_block_idx &&
393 block_offset < GetInBlockOffset(FirstMissingByte())) {
394 // If these 2 indexes point to the same block and the fist missing byte
395 // offset is larger than the starting offset, this means data available
396 // hasn't expanded to the next block yet.
Bence Békybac04052022-04-07 15:44:29 -0400397 // Only read part of block before FirstMissingByte().
398 iov->iov_len = GetInBlockOffset(FirstMissingByte()) - block_offset;
399 } else {
400 // Read entire block.
401 iov->iov_len = GetBlockCapacity(block_idx) - block_offset;
402 }
403
danzh6158f312024-07-18 10:57:22 -0700404 QUIC_BUG_IF(quic_invalid_peek_region, iov->iov_len > kBlockSizeBytes)
405 << "PeekRegion() at " << offset << " gets bad iov with length "
406 << iov->iov_len;
Bence Békybac04052022-04-07 15:44:29 -0400407 return true;
408}
409
410bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_consumed) {
411 if (bytes_consumed > ReadableBytes()) {
412 return false;
413 }
414 size_t bytes_to_consume = bytes_consumed;
415 while (bytes_to_consume > 0) {
416 size_t block_idx = NextBlockToRead();
417 size_t offset_in_block = ReadOffset();
418 size_t bytes_available = std::min<size_t>(
419 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
420 size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
421 total_bytes_read_ += bytes_read;
422 num_bytes_buffered_ -= bytes_read;
423 bytes_to_consume -= bytes_read;
424 // If advanced to the end of current block and end of buffer hasn't wrapped
425 // to this block yet.
426 if (bytes_available == bytes_read) {
427 RetireBlockIfEmpty(block_idx);
428 }
429 }
430
431 return true;
432}
433
434size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
435 size_t prev_total_bytes_read = total_bytes_read_;
436 total_bytes_read_ = NextExpectedByte();
437 Clear();
438 return total_bytes_read_ - prev_total_bytes_read;
439}
440
441void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
442 Clear();
443 current_blocks_count_ = 0;
444 blocks_.reset(nullptr);
445}
446
447size_t QuicStreamSequencerBuffer::ReadableBytes() const {
448 return FirstMissingByte() - total_bytes_read_;
449}
450
451bool QuicStreamSequencerBuffer::HasBytesToRead() const {
452 return ReadableBytes() > 0;
453}
454
455QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
456 return total_bytes_read_;
457}
458
459size_t QuicStreamSequencerBuffer::BytesBuffered() const {
460 return num_bytes_buffered_;
461}
462
463size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
464 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
465}
466
467size_t QuicStreamSequencerBuffer::GetInBlockOffset(
468 QuicStreamOffset offset) const {
469 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
470}
471
472size_t QuicStreamSequencerBuffer::ReadOffset() const {
473 return GetInBlockOffset(total_bytes_read_);
474}
475
476size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
477 return GetBlockIndex(total_bytes_read_);
478}
479
480bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
481 QUICHE_DCHECK(ReadableBytes() == 0 ||
482 GetInBlockOffset(total_bytes_read_) == 0)
483 << "RetireBlockIfEmpty() should only be called when advancing to next "
484 << "block or a gap has been reached.";
485 // If the whole buffer becomes empty, the last piece of data has been read.
486 if (Empty()) {
487 return RetireBlock(block_index);
488 }
489
490 // Check where the logical end of this buffer is.
491 // Not empty if the end of circular buffer has been wrapped to this block.
492 if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
493 return true;
494 }
495
496 // Read index remains in this block, which means a gap has been reached.
497 if (NextBlockToRead() == block_index) {
498 if (bytes_received_.Size() > 1) {
499 auto it = bytes_received_.begin();
500 ++it;
501 if (GetBlockIndex(it->min()) == block_index) {
502 // Do not retire the block if next data interval is in this block.
503 return true;
504 }
505 } else {
506 QUIC_BUG(quic_bug_10610_2) << "Read stopped at where it shouldn't.";
507 return false;
508 }
509 }
510 return RetireBlock(block_index);
511}
512
513bool QuicStreamSequencerBuffer::Empty() const {
514 return bytes_received_.Empty() ||
515 (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
516 bytes_received_.begin()->max() == total_bytes_read_);
517}
518
519size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
520 if ((block_index + 1) == max_blocks_count_) {
521 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
522 if (result == 0) { // whole block
523 result = kBlockSizeBytes;
524 }
525 return result;
526 } else {
527 return kBlockSizeBytes;
528 }
529}
530
531std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() const {
532 return bytes_received_.ToString();
533}
534
535QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
536 if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
537 // Offset 0 is not received yet.
538 return 0;
539 }
540 return bytes_received_.begin()->max();
541}
542
543QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
544 if (bytes_received_.Empty()) {
545 return 0;
546 }
547 return bytes_received_.rbegin()->max();
548}
549
550} // namespace quic