blob: 47c7c9f5608bcc3ce5879d5f31f9d719885ce573 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream_sequencer.h"
6
7#include <algorithm>
8#include <limits>
9#include <utility>
10
11#include "net/third_party/quiche/src/quic/core/quic_packets.h"
12#include "net/third_party/quiche/src/quic/core/quic_stream.h"
13#include "net/third_party/quiche/src/quic/core/quic_stream_sequencer_buffer.h"
14#include "net/third_party/quiche/src/quic/core/quic_utils.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_clock.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
18#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
19#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
20#include "net/third_party/quiche/src/quic/platform/api/quic_string.h"
21#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
22
23namespace quic {
24
25QuicStreamSequencer::QuicStreamSequencer(StreamInterface* quic_stream)
26 : stream_(quic_stream),
27 buffered_frames_(kStreamReceiveWindowLimit),
28 close_offset_(std::numeric_limits<QuicStreamOffset>::max()),
29 blocked_(false),
30 num_frames_received_(0),
31 num_duplicate_frames_received_(0),
32 ignore_read_data_(false),
33 level_triggered_(false),
34 stop_reading_when_level_triggered_(
35 GetQuicReloadableFlag(quic_stop_reading_when_level_triggered)) {}
36
37QuicStreamSequencer::~QuicStreamSequencer() {}
38
39void QuicStreamSequencer::OnStreamFrame(const QuicStreamFrame& frame) {
40 ++num_frames_received_;
41 const QuicStreamOffset byte_offset = frame.offset;
42 const size_t data_len = frame.data_length;
43
44 if (frame.fin) {
45 CloseStreamAtOffset(frame.offset + data_len);
46 if (data_len == 0) {
47 return;
48 }
49 }
50 OnFrameData(byte_offset, data_len, frame.data_buffer);
51}
52
53void QuicStreamSequencer::OnCryptoFrame(const QuicCryptoFrame& frame) {
54 ++num_frames_received_;
55 OnFrameData(frame.offset, frame.data_length, frame.data_buffer);
56}
57
58void QuicStreamSequencer::OnFrameData(QuicStreamOffset byte_offset,
59 size_t data_len,
60 const char* data_buffer) {
61 const size_t previous_readable_bytes = buffered_frames_.ReadableBytes();
62 size_t bytes_written;
vasilvvc48c8712019-03-11 13:38:16 -070063 std::string error_details;
QUICHE teama6ef0a62019-03-07 20:34:33 -050064 QuicErrorCode result = buffered_frames_.OnStreamData(
65 byte_offset, QuicStringPiece(data_buffer, data_len), &bytes_written,
66 &error_details);
67 if (result != QUIC_NO_ERROR) {
vasilvvc48c8712019-03-11 13:38:16 -070068 std::string details = QuicStrCat(
QUICHE teama6ef0a62019-03-07 20:34:33 -050069 "Stream ", stream_->id(), ": ", QuicErrorCodeToString(result), ": ",
70 error_details,
71 "\nPeer Address: ", stream_->PeerAddressOfLatestPacket().ToString());
72 QUIC_LOG_FIRST_N(WARNING, 50) << QuicErrorCodeToString(result);
73 QUIC_LOG_FIRST_N(WARNING, 50) << details;
74 stream_->CloseConnectionWithDetails(result, details);
75 return;
76 }
77
78 if (bytes_written == 0) {
79 ++num_duplicate_frames_received_;
80 // Silently ignore duplicates.
81 return;
82 }
83
84 if (blocked_) {
85 return;
86 }
87
88 if (level_triggered_) {
89 if (buffered_frames_.ReadableBytes() > previous_readable_bytes) {
90 // Readable bytes has changed, let stream decide if to inform application
91 // or not.
92 if (stop_reading_when_level_triggered_ && ignore_read_data_) {
93 QUIC_RELOADABLE_FLAG_COUNT(quic_stop_reading_when_level_triggered);
94 FlushBufferedFrames();
95 } else {
96 stream_->OnDataAvailable();
97 }
98 }
99 return;
100 }
101 const bool stream_unblocked =
102 previous_readable_bytes == 0 && buffered_frames_.ReadableBytes() > 0;
103 if (stream_unblocked) {
104 if (ignore_read_data_) {
105 FlushBufferedFrames();
106 } else {
107 stream_->OnDataAvailable();
108 }
109 }
110}
111
112void QuicStreamSequencer::CloseStreamAtOffset(QuicStreamOffset offset) {
113 const QuicStreamOffset kMaxOffset =
114 std::numeric_limits<QuicStreamOffset>::max();
115
116 // If there is a scheduled close, the new offset should match it.
117 if (close_offset_ != kMaxOffset && offset != close_offset_) {
118 stream_->Reset(QUIC_MULTIPLE_TERMINATION_OFFSETS);
119 return;
120 }
121
122 close_offset_ = offset;
123
124 MaybeCloseStream();
125}
126
127bool QuicStreamSequencer::MaybeCloseStream() {
128 if (blocked_ || !IsClosed()) {
129 return false;
130 }
131
132 QUIC_DVLOG(1) << "Passing up termination, as we've processed "
133 << buffered_frames_.BytesConsumed() << " of " << close_offset_
134 << " bytes.";
135 // This will cause the stream to consume the FIN.
136 // Technically it's an error if |num_bytes_consumed| isn't exactly
137 // equal to |close_offset|, but error handling seems silly at this point.
138 if (ignore_read_data_) {
139 // The sequencer is discarding stream data and must notify the stream on
140 // receipt of a FIN because the consumer won't.
141 stream_->OnFinRead();
142 } else {
143 stream_->OnDataAvailable();
144 }
145 buffered_frames_.Clear();
146 return true;
147}
148
149int QuicStreamSequencer::GetReadableRegions(iovec* iov, size_t iov_len) const {
150 DCHECK(!blocked_);
151 return buffered_frames_.GetReadableRegions(iov, iov_len);
152}
153
154bool QuicStreamSequencer::GetReadableRegion(iovec* iov) const {
155 DCHECK(!blocked_);
156 return buffered_frames_.GetReadableRegion(iov);
157}
158
159bool QuicStreamSequencer::PrefetchNextRegion(iovec* iov) {
160 DCHECK(!blocked_);
161 return buffered_frames_.PrefetchNextRegion(iov);
162}
163
vasilvvc48c8712019-03-11 13:38:16 -0700164void QuicStreamSequencer::Read(std::string* buffer) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500165 DCHECK(!blocked_);
166 buffer->resize(buffer->size() + ReadableBytes());
167 iovec iov;
168 iov.iov_len = ReadableBytes();
169 iov.iov_base = &(*buffer)[buffer->size() - iov.iov_len];
170 Readv(&iov, 1);
171}
172
173int QuicStreamSequencer::Readv(const struct iovec* iov, size_t iov_len) {
174 DCHECK(!blocked_);
vasilvvc48c8712019-03-11 13:38:16 -0700175 std::string error_details;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500176 size_t bytes_read;
177 QuicErrorCode read_error =
178 buffered_frames_.Readv(iov, iov_len, &bytes_read, &error_details);
179 if (read_error != QUIC_NO_ERROR) {
vasilvvc48c8712019-03-11 13:38:16 -0700180 std::string details =
QUICHE teama6ef0a62019-03-07 20:34:33 -0500181 QuicStrCat("Stream ", stream_->id(), ": ", error_details);
182 stream_->CloseConnectionWithDetails(read_error, details);
183 return static_cast<int>(bytes_read);
184 }
185
186 stream_->AddBytesConsumed(bytes_read);
187 return static_cast<int>(bytes_read);
188}
189
190bool QuicStreamSequencer::HasBytesToRead() const {
191 return buffered_frames_.HasBytesToRead();
192}
193
194size_t QuicStreamSequencer::ReadableBytes() const {
195 return buffered_frames_.ReadableBytes();
196}
197
198bool QuicStreamSequencer::IsClosed() const {
199 return buffered_frames_.BytesConsumed() >= close_offset_;
200}
201
202void QuicStreamSequencer::MarkConsumed(size_t num_bytes_consumed) {
203 DCHECK(!blocked_);
204 bool result = buffered_frames_.MarkConsumed(num_bytes_consumed);
205 if (!result) {
206 QUIC_BUG << "Invalid argument to MarkConsumed."
207 << " expect to consume: " << num_bytes_consumed
208 << ", but not enough bytes available. " << DebugString();
209 stream_->Reset(QUIC_ERROR_PROCESSING_STREAM);
210 return;
211 }
212 stream_->AddBytesConsumed(num_bytes_consumed);
213}
214
215void QuicStreamSequencer::SetBlockedUntilFlush() {
216 blocked_ = true;
217}
218
219void QuicStreamSequencer::SetUnblocked() {
220 blocked_ = false;
221 if (IsClosed() || HasBytesToRead()) {
222 stream_->OnDataAvailable();
223 }
224}
225
226void QuicStreamSequencer::StopReading() {
227 if (ignore_read_data_) {
228 return;
229 }
230 ignore_read_data_ = true;
231 FlushBufferedFrames();
232}
233
234void QuicStreamSequencer::ReleaseBuffer() {
235 buffered_frames_.ReleaseWholeBuffer();
236}
237
238void QuicStreamSequencer::ReleaseBufferIfEmpty() {
239 if (buffered_frames_.Empty()) {
240 buffered_frames_.ReleaseWholeBuffer();
241 }
242}
243
244void QuicStreamSequencer::FlushBufferedFrames() {
245 DCHECK(ignore_read_data_);
246 size_t bytes_flushed = buffered_frames_.FlushBufferedFrames();
247 QUIC_DVLOG(1) << "Flushing buffered data at offset "
248 << buffered_frames_.BytesConsumed() << " length "
249 << bytes_flushed << " for stream " << stream_->id();
250 stream_->AddBytesConsumed(bytes_flushed);
251 MaybeCloseStream();
252}
253
254size_t QuicStreamSequencer::NumBytesBuffered() const {
255 return buffered_frames_.BytesBuffered();
256}
257
258QuicStreamOffset QuicStreamSequencer::NumBytesConsumed() const {
259 return buffered_frames_.BytesConsumed();
260}
261
vasilvvc48c8712019-03-11 13:38:16 -0700262const std::string QuicStreamSequencer::DebugString() const {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500263 // clang-format off
264 return QuicStrCat("QuicStreamSequencer:",
265 "\n bytes buffered: ", NumBytesBuffered(),
266 "\n bytes consumed: ", NumBytesConsumed(),
267 "\n has bytes to read: ", HasBytesToRead() ? "true" : "false",
268 "\n frames received: ", num_frames_received(),
269 "\n close offset bytes: ", close_offset_,
270 "\n is closed: ", IsClosed() ? "true" : "false");
271 // clang-format on
272}
273
274} // namespace quic