blob: 75000bd3efb6a82f5b8ad8df0ad41b2f90113490 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2015 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream_sequencer_buffer.h"
6
7#include "net/third_party/quiche/src/quic/core/quic_constants.h"
8#include "net/third_party/quiche/src/quic/core/quic_interval.h"
9#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
10#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
11#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
12#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
13#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_string.h"
15
16namespace quic {
17namespace {
18
19size_t CalculateBlockCount(size_t max_capacity_bytes) {
20 return (max_capacity_bytes + QuicStreamSequencerBuffer::kBlockSizeBytes - 1) /
21 QuicStreamSequencerBuffer::kBlockSizeBytes;
22}
23
24// Upper limit of how many gaps allowed in buffer, which ensures a reasonable
25// number of iterations needed to find the right gap to fill when a frame
26// arrives.
27const size_t kMaxNumDataIntervalsAllowed = 2 * kMaxPacketGap;
28
29} // namespace
30
31QuicStreamSequencerBuffer::QuicStreamSequencerBuffer(size_t max_capacity_bytes)
32 : max_buffer_capacity_bytes_(max_capacity_bytes),
33 blocks_count_(CalculateBlockCount(max_capacity_bytes)),
34 total_bytes_read_(0),
35 blocks_(nullptr),
36 total_bytes_prefetched_(0),
37 faster_interval_add_in_sequence_buffer_(
38 GetQuicReloadableFlag(quic_faster_interval_add_in_sequence_buffer)) {
39 Clear();
40}
41
42QuicStreamSequencerBuffer::~QuicStreamSequencerBuffer() {
43 Clear();
44}
45
46void QuicStreamSequencerBuffer::Clear() {
47 if (blocks_ != nullptr) {
48 for (size_t i = 0; i < blocks_count_; ++i) {
49 if (blocks_[i] != nullptr) {
50 RetireBlock(i);
51 }
52 }
53 }
54 num_bytes_buffered_ = 0;
55 bytes_received_.Clear();
56 bytes_received_.Add(0, total_bytes_read_);
57}
58
59bool QuicStreamSequencerBuffer::RetireBlock(size_t idx) {
60 if (blocks_[idx] == nullptr) {
61 QUIC_BUG << "Try to retire block twice";
62 return false;
63 }
64 delete blocks_[idx];
65 blocks_[idx] = nullptr;
66 QUIC_DVLOG(1) << "Retired block with index: " << idx;
67 return true;
68}
69
70QuicErrorCode QuicStreamSequencerBuffer::OnStreamData(
71 QuicStreamOffset starting_offset,
72 QuicStringPiece data,
73 size_t* const bytes_buffered,
vasilvvc48c8712019-03-11 13:38:16 -070074 std::string* error_details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -050075 *bytes_buffered = 0;
76 size_t size = data.size();
77 if (size == 0) {
78 *error_details = "Received empty stream frame without FIN.";
79 return QUIC_EMPTY_STREAM_FRAME_NO_FIN;
80 }
81 // Write beyond the current range this buffer is covering.
82 if (starting_offset + size > total_bytes_read_ + max_buffer_capacity_bytes_ ||
83 starting_offset + size < starting_offset) {
84 *error_details = "Received data beyond available range.";
85 return QUIC_INTERNAL_ERROR;
86 }
87 if (bytes_received_.Empty() ||
88 starting_offset >= bytes_received_.rbegin()->max() ||
89 bytes_received_.IsDisjoint(QuicInterval<QuicStreamOffset>(
90 starting_offset, starting_offset + size))) {
91 // Optimization for the typical case, when all data is newly received.
92 if (faster_interval_add_in_sequence_buffer_) {
93 QUIC_RELOADABLE_FLAG_COUNT(quic_faster_interval_add_in_sequence_buffer);
94 bytes_received_.AddOptimizedForAppend(starting_offset,
95 starting_offset + size);
96 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
97 // This frame is going to create more intervals than allowed. Stop
98 // processing.
99 *error_details = "Too many data intervals received for this stream.";
100 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
101 }
102 } else {
103 if (!bytes_received_.Empty() &&
104 starting_offset == bytes_received_.rbegin()->max()) {
105 // Extend the right edge of last interval.
106 // TODO(fayang): Encapsulate this into a future version of
107 // QuicIntervalSet if this is more efficient than Add.
108 const_cast<QuicInterval<QuicStreamOffset>*>(
109 &(*bytes_received_.rbegin()))
110 ->SetMax(starting_offset + size);
111 } else {
112 bytes_received_.Add(starting_offset, starting_offset + size);
113 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
114 // This frame is going to create more intervals than allowed. Stop
115 // processing.
116 *error_details = "Too many data intervals received for this stream.";
117 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
118 }
119 }
120 }
121 size_t bytes_copy = 0;
122 if (!CopyStreamData(starting_offset, data, &bytes_copy, error_details)) {
123 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
124 }
125 *bytes_buffered += bytes_copy;
126 num_bytes_buffered_ += *bytes_buffered;
127 return QUIC_NO_ERROR;
128 }
129 // Slow path, received data overlaps with received data.
130 QuicIntervalSet<QuicStreamOffset> newly_received(starting_offset,
131 starting_offset + size);
132 newly_received.Difference(bytes_received_);
133 if (newly_received.Empty()) {
134 return QUIC_NO_ERROR;
135 }
136 bytes_received_.Add(starting_offset, starting_offset + size);
137 if (bytes_received_.Size() >= kMaxNumDataIntervalsAllowed) {
138 // This frame is going to create more intervals than allowed. Stop
139 // processing.
140 *error_details = "Too many data intervals received for this stream.";
141 return QUIC_TOO_MANY_STREAM_DATA_INTERVALS;
142 }
143 for (const auto& interval : newly_received) {
144 const QuicStreamOffset copy_offset = interval.min();
145 const QuicByteCount copy_length = interval.max() - interval.min();
146 size_t bytes_copy = 0;
147 if (!CopyStreamData(copy_offset,
148 data.substr(copy_offset - starting_offset, copy_length),
149 &bytes_copy, error_details)) {
150 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
151 }
152 *bytes_buffered += bytes_copy;
153 }
154 num_bytes_buffered_ += *bytes_buffered;
155 return QUIC_NO_ERROR;
156}
157
158bool QuicStreamSequencerBuffer::CopyStreamData(QuicStreamOffset offset,
159 QuicStringPiece data,
160 size_t* bytes_copy,
vasilvvc48c8712019-03-11 13:38:16 -0700161 std::string* error_details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500162 *bytes_copy = 0;
163 size_t source_remaining = data.size();
164 if (source_remaining == 0) {
165 return true;
166 }
167 const char* source = data.data();
168 // Write data block by block. If corresponding block has not created yet,
169 // create it first.
170 // Stop when all data are written or reaches the logical end of the buffer.
171 while (source_remaining > 0) {
172 const size_t write_block_num = GetBlockIndex(offset);
173 const size_t write_block_offset = GetInBlockOffset(offset);
174 DCHECK_GT(blocks_count_, write_block_num);
175
176 size_t block_capacity = GetBlockCapacity(write_block_num);
177 size_t bytes_avail = block_capacity - write_block_offset;
178
179 // If this write meets the upper boundary of the buffer,
180 // reduce the available free bytes.
181 if (offset + bytes_avail > total_bytes_read_ + max_buffer_capacity_bytes_) {
182 bytes_avail = total_bytes_read_ + max_buffer_capacity_bytes_ - offset;
183 }
184
185 if (blocks_ == nullptr) {
186 blocks_.reset(new BufferBlock*[blocks_count_]());
187 for (size_t i = 0; i < blocks_count_; ++i) {
188 blocks_[i] = nullptr;
189 }
190 }
191
192 if (write_block_num >= blocks_count_) {
193 *error_details = QuicStrCat(
194 "QuicStreamSequencerBuffer error: OnStreamData() exceed array bounds."
195 "write offset = ",
196 offset, " write_block_num = ", write_block_num,
197 " blocks_count_ = ", blocks_count_);
198 return false;
199 }
200 if (blocks_ == nullptr) {
201 *error_details =
202 "QuicStreamSequencerBuffer error: OnStreamData() blocks_ is null";
203 return false;
204 }
205 if (blocks_[write_block_num] == nullptr) {
206 // TODO(danzh): Investigate if using a freelist would improve performance.
207 // Same as RetireBlock().
208 blocks_[write_block_num] = new BufferBlock();
209 }
210
211 const size_t bytes_to_copy =
212 std::min<size_t>(bytes_avail, source_remaining);
213 char* dest = blocks_[write_block_num]->buffer + write_block_offset;
214 QUIC_DVLOG(1) << "Write at offset: " << offset
215 << " length: " << bytes_to_copy;
216
217 if (dest == nullptr || source == nullptr) {
218 *error_details = QuicStrCat(
219 "QuicStreamSequencerBuffer error: OnStreamData()"
220 " dest == nullptr: ",
221 (dest == nullptr), " source == nullptr: ", (source == nullptr),
222 " Writing at offset ", offset, " Gaps: ", GapsDebugString(),
223 " Remaining frames: ", ReceivedFramesDebugString(),
224 " total_bytes_read_ = ", total_bytes_read_);
225 return false;
226 }
227 memcpy(dest, source, bytes_to_copy);
228 source += bytes_to_copy;
229 source_remaining -= bytes_to_copy;
230 offset += bytes_to_copy;
231 *bytes_copy += bytes_to_copy;
232 }
233 return true;
234}
235
236QuicErrorCode QuicStreamSequencerBuffer::Readv(const iovec* dest_iov,
237 size_t dest_count,
238 size_t* bytes_read,
vasilvvc48c8712019-03-11 13:38:16 -0700239 std::string* error_details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500240 *bytes_read = 0;
241 for (size_t i = 0; i < dest_count && ReadableBytes() > 0; ++i) {
242 char* dest = reinterpret_cast<char*>(dest_iov[i].iov_base);
243 DCHECK(dest != nullptr);
244 size_t dest_remaining = dest_iov[i].iov_len;
245 while (dest_remaining > 0 && ReadableBytes() > 0) {
246 size_t block_idx = NextBlockToRead();
247 size_t start_offset_in_block = ReadOffset();
248 size_t block_capacity = GetBlockCapacity(block_idx);
249 size_t bytes_available_in_block = std::min<size_t>(
250 ReadableBytes(), block_capacity - start_offset_in_block);
251 size_t bytes_to_copy =
252 std::min<size_t>(bytes_available_in_block, dest_remaining);
253 DCHECK_GT(bytes_to_copy, 0u);
254 if (blocks_[block_idx] == nullptr || dest == nullptr) {
255 *error_details = QuicStrCat(
256 "QuicStreamSequencerBuffer error:"
257 " Readv() dest == nullptr: ",
258 (dest == nullptr), " blocks_[", block_idx,
259 "] == nullptr: ", (blocks_[block_idx] == nullptr),
260 " Gaps: ", GapsDebugString(),
261 " Remaining frames: ", ReceivedFramesDebugString(),
262 " total_bytes_read_ = ", total_bytes_read_);
263 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
264 }
265 memcpy(dest, blocks_[block_idx]->buffer + start_offset_in_block,
266 bytes_to_copy);
267 dest += bytes_to_copy;
268 dest_remaining -= bytes_to_copy;
269 num_bytes_buffered_ -= bytes_to_copy;
270 total_bytes_read_ += bytes_to_copy;
271 *bytes_read += bytes_to_copy;
272
273 // Retire the block if all the data is read out and no other data is
274 // stored in this block.
275 // In case of failing to retire a block which is ready to retire, return
276 // immediately.
277 if (bytes_to_copy == bytes_available_in_block) {
278 bool retire_successfully = RetireBlockIfEmpty(block_idx);
279 if (!retire_successfully) {
280 *error_details = QuicStrCat(
281 "QuicStreamSequencerBuffer error: fail to retire block ",
282 block_idx,
283 " as the block is already released, total_bytes_read_ = ",
284 total_bytes_read_, " Gaps: ", GapsDebugString());
285 return QUIC_STREAM_SEQUENCER_INVALID_STATE;
286 }
287 }
288 }
289 }
290 total_bytes_prefetched_ =
291 std::max(total_bytes_prefetched_, total_bytes_read_);
292
293 return QUIC_NO_ERROR;
294}
295
296int QuicStreamSequencerBuffer::GetReadableRegions(struct iovec* iov,
297 int iov_count) const {
298 DCHECK(iov != nullptr);
299 DCHECK_GT(iov_count, 0);
300
301 if (ReadableBytes() == 0) {
302 iov[0].iov_base = nullptr;
303 iov[0].iov_len = 0;
304 return 0;
305 }
306
307 size_t start_block_idx = NextBlockToRead();
308 QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
309 DCHECK_GE(readable_offset_end + 1, total_bytes_read_);
310 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
311 size_t end_block_idx = GetBlockIndex(readable_offset_end);
312
313 // If readable region is within one block, deal with it seperately.
314 if (start_block_idx == end_block_idx && ReadOffset() <= end_block_offset) {
315 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
316 iov[0].iov_len = ReadableBytes();
317 QUIC_DVLOG(1) << "Got only a single block with index: " << start_block_idx;
318 return 1;
319 }
320
321 // Get first block
322 iov[0].iov_base = blocks_[start_block_idx]->buffer + ReadOffset();
323 iov[0].iov_len = GetBlockCapacity(start_block_idx) - ReadOffset();
324 QUIC_DVLOG(1) << "Got first block " << start_block_idx << " with len "
325 << iov[0].iov_len;
326 DCHECK_GT(readable_offset_end + 1, total_bytes_read_ + iov[0].iov_len)
327 << "there should be more available data";
328
329 // Get readable regions of the rest blocks till either 2nd to last block
330 // before gap is met or |iov| is filled. For these blocks, one whole block is
331 // a region.
332 int iov_used = 1;
333 size_t block_idx = (start_block_idx + iov_used) % blocks_count_;
334 while (block_idx != end_block_idx && iov_used < iov_count) {
335 DCHECK(nullptr != blocks_[block_idx]);
336 iov[iov_used].iov_base = blocks_[block_idx]->buffer;
337 iov[iov_used].iov_len = GetBlockCapacity(block_idx);
338 QUIC_DVLOG(1) << "Got block with index: " << block_idx;
339 ++iov_used;
340 block_idx = (start_block_idx + iov_used) % blocks_count_;
341 }
342
343 // Deal with last block if |iov| can hold more.
344 if (iov_used < iov_count) {
345 DCHECK(nullptr != blocks_[block_idx]);
346 iov[iov_used].iov_base = blocks_[end_block_idx]->buffer;
347 iov[iov_used].iov_len = end_block_offset + 1;
348 QUIC_DVLOG(1) << "Got last block with index: " << end_block_idx;
349 ++iov_used;
350 }
351 return iov_used;
352}
353
354bool QuicStreamSequencerBuffer::GetReadableRegion(iovec* iov) const {
355 return GetReadableRegions(iov, 1) == 1;
356}
357
358bool QuicStreamSequencerBuffer::PrefetchNextRegion(iovec* iov) {
359 DCHECK(iov != nullptr);
360
361 if (total_bytes_prefetched_ == FirstMissingByte()) {
362 return false;
363 }
364
365 size_t start_block_idx = GetBlockIndex(total_bytes_prefetched_);
366 size_t start_block_offset = GetInBlockOffset(total_bytes_prefetched_);
367 QuicStreamOffset readable_offset_end = FirstMissingByte() - 1;
368 size_t end_block_offset = GetInBlockOffset(readable_offset_end);
369 size_t end_block_idx = GetBlockIndex(readable_offset_end);
370
371 if (start_block_idx != end_block_idx) {
372 iov->iov_base = blocks_[start_block_idx]->buffer + start_block_offset;
373 iov->iov_len = GetBlockCapacity(start_block_idx) - start_block_offset;
374 total_bytes_prefetched_ += iov->iov_len;
375 return true;
376 }
377 iov->iov_base = blocks_[end_block_idx]->buffer + start_block_offset;
378 iov->iov_len = end_block_offset - start_block_offset + 1;
379 total_bytes_prefetched_ += iov->iov_len;
380 return true;
381}
382
383bool QuicStreamSequencerBuffer::MarkConsumed(size_t bytes_used) {
384 if (bytes_used > ReadableBytes()) {
385 return false;
386 }
387 size_t bytes_to_consume = bytes_used;
388 while (bytes_to_consume > 0) {
389 size_t block_idx = NextBlockToRead();
390 size_t offset_in_block = ReadOffset();
391 size_t bytes_available = std::min<size_t>(
392 ReadableBytes(), GetBlockCapacity(block_idx) - offset_in_block);
393 size_t bytes_read = std::min<size_t>(bytes_to_consume, bytes_available);
394 total_bytes_read_ += bytes_read;
395 num_bytes_buffered_ -= bytes_read;
396 bytes_to_consume -= bytes_read;
397 // If advanced to the end of current block and end of buffer hasn't wrapped
398 // to this block yet.
399 if (bytes_available == bytes_read) {
400 RetireBlockIfEmpty(block_idx);
401 }
402 }
403 total_bytes_prefetched_ =
404 std::max(total_bytes_read_, total_bytes_prefetched_);
405 return true;
406}
407
408size_t QuicStreamSequencerBuffer::FlushBufferedFrames() {
409 size_t prev_total_bytes_read = total_bytes_read_;
410 total_bytes_read_ = NextExpectedByte();
411 Clear();
412 return total_bytes_read_ - prev_total_bytes_read;
413}
414
415void QuicStreamSequencerBuffer::ReleaseWholeBuffer() {
416 Clear();
417 blocks_.reset(nullptr);
418}
419
420size_t QuicStreamSequencerBuffer::ReadableBytes() const {
421 return FirstMissingByte() - total_bytes_read_;
422}
423
424bool QuicStreamSequencerBuffer::HasBytesToRead() const {
425 return ReadableBytes() > 0;
426}
427
428QuicStreamOffset QuicStreamSequencerBuffer::BytesConsumed() const {
429 return total_bytes_read_;
430}
431
432size_t QuicStreamSequencerBuffer::BytesBuffered() const {
433 return num_bytes_buffered_;
434}
435
436size_t QuicStreamSequencerBuffer::GetBlockIndex(QuicStreamOffset offset) const {
437 return (offset % max_buffer_capacity_bytes_) / kBlockSizeBytes;
438}
439
440size_t QuicStreamSequencerBuffer::GetInBlockOffset(
441 QuicStreamOffset offset) const {
442 return (offset % max_buffer_capacity_bytes_) % kBlockSizeBytes;
443}
444
445size_t QuicStreamSequencerBuffer::ReadOffset() const {
446 return GetInBlockOffset(total_bytes_read_);
447}
448
449size_t QuicStreamSequencerBuffer::NextBlockToRead() const {
450 return GetBlockIndex(total_bytes_read_);
451}
452
453bool QuicStreamSequencerBuffer::RetireBlockIfEmpty(size_t block_index) {
454 DCHECK(ReadableBytes() == 0 || GetInBlockOffset(total_bytes_read_) == 0)
455 << "RetireBlockIfEmpty() should only be called when advancing to next "
456 << "block or a gap has been reached.";
457 // If the whole buffer becomes empty, the last piece of data has been read.
458 if (Empty()) {
459 return RetireBlock(block_index);
460 }
461
462 // Check where the logical end of this buffer is.
463 // Not empty if the end of circular buffer has been wrapped to this block.
464 if (GetBlockIndex(NextExpectedByte() - 1) == block_index) {
465 return true;
466 }
467
468 // Read index remains in this block, which means a gap has been reached.
469 if (NextBlockToRead() == block_index) {
470 if (bytes_received_.Size() > 1) {
471 auto it = bytes_received_.begin();
472 ++it;
473 if (GetBlockIndex(it->min()) == block_index) {
474 // Do not retire the block if next data interval is in this block.
475 return true;
476 }
477 } else {
478 QUIC_BUG << "Read stopped at where it shouldn't.";
479 return false;
480 }
481 }
482 return RetireBlock(block_index);
483}
484
485bool QuicStreamSequencerBuffer::Empty() const {
486 return bytes_received_.Empty() ||
487 (bytes_received_.Size() == 1 && total_bytes_read_ > 0 &&
488 bytes_received_.begin()->max() == total_bytes_read_);
489}
490
491size_t QuicStreamSequencerBuffer::GetBlockCapacity(size_t block_index) const {
492 if ((block_index + 1) == blocks_count_) {
493 size_t result = max_buffer_capacity_bytes_ % kBlockSizeBytes;
494 if (result == 0) { // whole block
495 result = kBlockSizeBytes;
496 }
497 return result;
498 } else {
499 return kBlockSizeBytes;
500 }
501}
502
vasilvvc48c8712019-03-11 13:38:16 -0700503std::string QuicStreamSequencerBuffer::GapsDebugString() {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500504 // TODO(vasilvv): this should return the complement of |bytes_received_|.
505 return bytes_received_.ToString();
506}
507
vasilvvc48c8712019-03-11 13:38:16 -0700508std::string QuicStreamSequencerBuffer::ReceivedFramesDebugString() {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500509 return bytes_received_.ToString();
510}
511
512QuicStreamOffset QuicStreamSequencerBuffer::FirstMissingByte() const {
513 if (bytes_received_.Empty() || bytes_received_.begin()->min() > 0) {
514 // Offset 0 is not received yet.
515 return 0;
516 }
517 return bytes_received_.begin()->max();
518}
519
520QuicStreamOffset QuicStreamSequencerBuffer::NextExpectedByte() const {
521 if (bytes_received_.Empty()) {
522 return 0;
523 }
524 return bytes_received_.rbegin()->max();
525}
526
527} // namespace quic