blob: ee3f983a174829280457a43b2b8588dbd69e1be5 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream.h"
6
dschinazi7ccff582020-05-13 10:24:39 -07007#include <limits>
vasilvv872e7a32019-03-12 16:42:44 -07008#include <string>
9
renjietang963c2ec2019-09-12 11:46:50 -070010#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050011#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
12#include "net/third_party/quiche/src/quic/core/quic_session.h"
renjietang15afba32019-10-23 14:32:35 -070013#include "net/third_party/quiche/src/quic/core/quic_types.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050014#include "net/third_party/quiche/src/quic/core/quic_utils.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
18#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
renjietang41a1b412020-02-27 15:05:14 -080019#include "net/third_party/quiche/src/common/platform/api/quiche_optional.h"
dmcardlecf0bfcf2019-12-13 08:08:21 -080020#include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
21#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050022
dmcardle15db6882020-02-26 12:55:36 -080023using quiche::QuicheOptional;
QUICHE teama6ef0a62019-03-07 20:34:33 -050024using spdy::SpdyPriority;
25
26namespace quic {
27
28#define ENDPOINT \
renjietangff3d3a32020-02-13 15:13:51 -080029 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
QUICHE teama6ef0a62019-03-07 20:34:33 -050030
31namespace {
32
dschinazif1e7b422020-04-30 12:21:28 -070033QuicByteCount DefaultFlowControlWindow(ParsedQuicVersion version) {
dschinazi18cdf132019-10-09 16:08:18 -070034 if (!version.AllowsLowFlowControlLimits()) {
35 return kDefaultFlowControlSendWindow;
36 }
37 return 0;
38}
39
dschinazif1e7b422020-04-30 12:21:28 -070040QuicByteCount GetInitialStreamFlowControlWindowToSend(QuicSession* session,
41 QuicStreamId stream_id) {
rchb0451852019-09-11 21:17:01 -070042 ParsedQuicVersion version = session->connection()->version();
43 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
44 return session->config()->GetInitialStreamFlowControlWindowToSend();
45 }
46
47 // Unidirectional streams (v99 only).
48 if (VersionHasIetfQuicFrames(version.transport_version) &&
renjietangd262e252020-06-19 15:11:24 -070049 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
rchb0451852019-09-11 21:17:01 -070050 return session->config()
51 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
52 }
53
dschinazi18cdf132019-10-09 16:08:18 -070054 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
55 session->perspective())) {
rchb0451852019-09-11 21:17:01 -070056 return session->config()
57 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
58 }
59
60 return session->config()
61 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
QUICHE teama6ef0a62019-03-07 20:34:33 -050062}
63
dschinazif1e7b422020-04-30 12:21:28 -070064QuicByteCount GetReceivedFlowControlWindow(QuicSession* session,
65 QuicStreamId stream_id) {
rchb0451852019-09-11 21:17:01 -070066 ParsedQuicVersion version = session->connection()->version();
67 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
68 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
69 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
70 }
71
dschinazi18cdf132019-10-09 16:08:18 -070072 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070073 }
74
75 // Unidirectional streams (v99 only).
76 if (VersionHasIetfQuicFrames(version.transport_version) &&
renjietangd262e252020-06-19 15:11:24 -070077 !QuicUtils::IsBidirectionalStreamId(stream_id, version)) {
rchb0451852019-09-11 21:17:01 -070078 if (session->config()
79 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
80 return session->config()
81 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
82 }
dschinazi18cdf132019-10-09 16:08:18 -070083
84 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070085 }
86
dschinazi18cdf132019-10-09 16:08:18 -070087 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
88 session->perspective())) {
rchb0451852019-09-11 21:17:01 -070089 if (session->config()
dschinazi18cdf132019-10-09 16:08:18 -070090 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
rchb0451852019-09-11 21:17:01 -070091 return session->config()
dschinazi18cdf132019-10-09 16:08:18 -070092 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
rchb0451852019-09-11 21:17:01 -070093 }
dschinazi18cdf132019-10-09 16:08:18 -070094
95 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070096 }
97
98 if (session->config()
dschinazi18cdf132019-10-09 16:08:18 -070099 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
rchb0451852019-09-11 21:17:01 -0700100 return session->config()
dschinazi18cdf132019-10-09 16:08:18 -0700101 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500102 }
103
dschinazi18cdf132019-10-09 16:08:18 -0700104 return DefaultFlowControlWindow(version);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500105}
106
107} // namespace
108
109// static
110const SpdyPriority QuicStream::kDefaultPriority;
111
bnc5f202512020-02-01 18:43:02 -0800112// static
113const int QuicStream::kDefaultUrgency;
114
QUICHE teama6ef0a62019-03-07 20:34:33 -0500115PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
116 : id_(id),
117 session_(session),
renjietangf196f6a2020-02-12 12:34:23 -0800118 stream_delegate_(session),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500119 stream_bytes_read_(0),
120 fin_received_(false),
121 connection_flow_controller_(session->flow_controller()),
122 flow_controller_(session,
123 id,
124 /*is_connection_flow_controller*/ false,
rchb0451852019-09-11 21:17:01 -0700125 GetReceivedFlowControlWindow(session, id),
126 GetInitialStreamFlowControlWindowToSend(session, id),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500127 kStreamReceiveWindowLimit,
128 session_->flow_controller()->auto_tune_receive_window(),
129 session_->flow_controller()),
130 sequencer_(this) {}
131
132void PendingStream::OnDataAvailable() {
bnc092d8212019-08-07 11:53:20 -0700133 // Data should be kept in the sequencer so that
134 // QuicSession::ProcessPendingStream() can read it.
QUICHE teama6ef0a62019-03-07 20:34:33 -0500135}
136
137void PendingStream::OnFinRead() {
bnc092d8212019-08-07 11:53:20 -0700138 DCHECK(sequencer_.IsClosed());
QUICHE teama6ef0a62019-03-07 20:34:33 -0500139}
140
141void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
renjietangbb1c4892019-05-24 15:58:44 -0700142 // It will be called when the metadata of the stream is consumed.
143 flow_controller_.AddBytesConsumed(bytes);
144 connection_flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500145}
146
renjietang4f732482019-10-24 14:48:23 -0700147void PendingStream::Reset(QuicRstStreamErrorCode /*error*/) {
148 // Currently PendingStream is only read-unidirectional. It shouldn't send
149 // Reset.
renjietang4f732482019-10-24 14:48:23 -0700150 QUIC_NOTREACHED();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500151}
152
renjietang87df0d02020-02-13 11:53:52 -0800153void PendingStream::OnUnrecoverableError(QuicErrorCode error,
154 const std::string& details) {
renjietangf196f6a2020-02-12 12:34:23 -0800155 stream_delegate_->OnStreamError(error, details);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500156}
157
158QuicStreamId PendingStream::id() const {
159 return id_;
160}
161
QUICHE teama6ef0a62019-03-07 20:34:33 -0500162void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
163 DCHECK_EQ(frame.stream_id, id_);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500164
165 bool is_stream_too_long =
166 (frame.offset > kMaxStreamLength) ||
167 (kMaxStreamLength - frame.offset < frame.data_length);
168 if (is_stream_too_long) {
169 // Close connection if stream becomes too long.
170 QUIC_PEER_BUG
171 << "Receive stream frame reaches max stream length. frame offset "
172 << frame.offset << " length " << frame.data_length;
renjietang87df0d02020-02-13 11:53:52 -0800173 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
174 "Peer sends more data than allowed on this stream.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500175 return;
176 }
177
renjietang89aa73e2019-10-21 15:03:51 -0700178 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800179 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700180 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800181 quiche::QuicheStrCat(
renjietang15afba32019-10-23 14:32:35 -0700182 "Stream ", id_,
183 " received data with offset: ", frame.offset + frame.data_length,
184 ", which is beyond close offset: ", sequencer()->close_offset()));
renjietang963c2ec2019-09-12 11:46:50 -0700185 return;
186 }
187
QUICHE teama6ef0a62019-03-07 20:34:33 -0500188 if (frame.fin) {
189 fin_received_ = true;
190 }
191
192 // This count includes duplicate data received.
dschinazif1e7b422020-04-30 12:21:28 -0700193 QuicByteCount frame_payload_size = frame.data_length;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500194 stream_bytes_read_ += frame_payload_size;
195
196 // Flow control is interested in tracking highest received offset.
197 // Only interested in received frames that carry data.
198 if (frame_payload_size > 0 &&
199 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
200 // As the highest received offset has changed, check to see if this is a
201 // violation of flow control.
202 if (flow_controller_.FlowControlViolation() ||
203 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800204 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
205 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500206 return;
207 }
208 }
209
210 sequencer_.OnStreamFrame(frame);
211}
212
213void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
214 DCHECK_EQ(frame.stream_id, id_);
215
216 if (frame.byte_offset > kMaxStreamLength) {
217 // Peer are not suppose to write bytes more than maxium allowed.
renjietang87df0d02020-02-13 11:53:52 -0800218 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
219 "Reset frame stream offset overflow.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500220 return;
221 }
renjietang15afba32019-10-23 14:32:35 -0700222
223 const QuicStreamOffset kMaxOffset =
224 std::numeric_limits<QuicStreamOffset>::max();
225 if (sequencer()->close_offset() != kMaxOffset &&
226 frame.byte_offset != sequencer()->close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800227 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700228 QUIC_STREAM_MULTIPLE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800229 quiche::QuicheStrCat("Stream ", id_,
230 " received new final offset: ", frame.byte_offset,
231 ", which is different from close offset: ",
232 sequencer()->close_offset()));
renjietang15afba32019-10-23 14:32:35 -0700233 return;
234 }
235
QUICHE teama6ef0a62019-03-07 20:34:33 -0500236 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
237 if (flow_controller_.FlowControlViolation() ||
238 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800239 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
240 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500241 return;
242 }
243}
244
245bool PendingStream::MaybeIncreaseHighestReceivedOffset(
246 QuicStreamOffset new_offset) {
247 uint64_t increment =
248 new_offset - flow_controller_.highest_received_byte_offset();
249 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
250 return false;
251 }
252
253 // If |new_offset| increased the stream flow controller's highest received
254 // offset, increase the connection flow controller's value by the incremental
255 // difference.
256 connection_flow_controller_->UpdateHighestReceivedOffset(
257 connection_flow_controller_->highest_received_byte_offset() + increment);
258 return true;
259}
260
dschinazif1e7b422020-04-30 12:21:28 -0700261void PendingStream::MarkConsumed(QuicByteCount num_bytes) {
renjietangbb1c4892019-05-24 15:58:44 -0700262 sequencer_.MarkConsumed(num_bytes);
263}
264
bnc4ff60622019-08-09 18:55:45 -0700265void PendingStream::StopReading() {
266 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
267 sequencer_.StopReading();
268}
269
renjietangbaea59c2019-05-29 15:08:14 -0700270QuicStream::QuicStream(PendingStream* pending, StreamType type, bool is_static)
271 : QuicStream(pending->id_,
272 pending->session_,
273 std::move(pending->sequencer_),
renjietang35448992019-05-08 17:08:57 -0700274 is_static,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500275 type,
renjietangbaea59c2019-05-29 15:08:14 -0700276 pending->stream_bytes_read_,
277 pending->fin_received_,
278 std::move(pending->flow_controller_),
279 pending->connection_flow_controller_) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500280 sequencer_.set_stream(this);
281}
282
nharperd5c4a932019-05-13 13:58:49 -0700283namespace {
284
dmcardle15db6882020-02-26 12:55:36 -0800285QuicheOptional<QuicFlowController> FlowController(QuicStreamId id,
286 QuicSession* session,
287 StreamType type) {
nharperd5c4a932019-05-13 13:58:49 -0700288 if (type == CRYPTO) {
289 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
290 // it is using crypto frames instead of stream frames. The QuicCryptoStream
291 // doesn't have any flow control in that case, so we don't create a
292 // QuicFlowController for it.
dmcardle15db6882020-02-26 12:55:36 -0800293 return QuicheOptional<QuicFlowController>();
nharperd5c4a932019-05-13 13:58:49 -0700294 }
295 return QuicFlowController(
296 session, id,
297 /*is_connection_flow_controller*/ false,
rchb0451852019-09-11 21:17:01 -0700298 GetReceivedFlowControlWindow(session, id),
299 GetInitialStreamFlowControlWindowToSend(session, id),
nharperd5c4a932019-05-13 13:58:49 -0700300 kStreamReceiveWindowLimit,
301 session->flow_controller()->auto_tune_receive_window(),
302 session->flow_controller());
303}
304
305} // namespace
306
QUICHE teama6ef0a62019-03-07 20:34:33 -0500307QuicStream::QuicStream(QuicStreamId id,
308 QuicSession* session,
309 bool is_static,
310 StreamType type)
311 : QuicStream(id,
312 session,
313 QuicStreamSequencer(this),
314 is_static,
315 type,
316 0,
317 false,
nharperd5c4a932019-05-13 13:58:49 -0700318 FlowController(id, session, type),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500319 session->flow_controller()) {}
320
321QuicStream::QuicStream(QuicStreamId id,
322 QuicSession* session,
323 QuicStreamSequencer sequencer,
324 bool is_static,
325 StreamType type,
326 uint64_t stream_bytes_read,
327 bool fin_received,
dmcardle15db6882020-02-26 12:55:36 -0800328 QuicheOptional<QuicFlowController> flow_controller,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500329 QuicFlowController* connection_flow_controller)
330 : sequencer_(std::move(sequencer)),
331 id_(id),
332 session_(session),
renjietangf196f6a2020-02-12 12:34:23 -0800333 stream_delegate_(session),
bnc5f202512020-02-01 18:43:02 -0800334 precedence_(CalculateDefaultPriority(session)),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500335 stream_bytes_read_(stream_bytes_read),
336 stream_error_(QUIC_STREAM_NO_ERROR),
337 connection_error_(QUIC_NO_ERROR),
338 read_side_closed_(false),
339 write_side_closed_(false),
340 fin_buffered_(false),
341 fin_sent_(false),
342 fin_outstanding_(false),
343 fin_lost_(false),
344 fin_received_(fin_received),
345 rst_sent_(false),
346 rst_received_(false),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500347 flow_controller_(std::move(flow_controller)),
348 connection_flow_controller_(connection_flow_controller),
349 stream_contributes_to_connection_flow_control_(true),
350 busy_counter_(0),
351 add_random_padding_after_fin_(false),
352 send_buffer_(
353 session->connection()->helper()->GetStreamSendBufferAllocator()),
354 buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
355 is_static_(is_static),
356 deadline_(QuicTime::Zero()),
fayangbe6d6642020-04-16 14:15:34 -0700357 was_draining_(false),
renjietangd1d00852019-09-06 10:43:12 -0700358 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
nharperd5c4a932019-05-13 13:58:49 -0700359 type != CRYPTO
QUICHE teama6ef0a62019-03-07 20:34:33 -0500360 ? QuicUtils::GetStreamType(id_,
renjietangc9e80442019-11-06 17:24:37 -0800361 session->perspective(),
renjietangd262e252020-06-19 15:11:24 -0700362 session->IsIncomingStream(id_),
363 session->version())
renjietangff3d3a32020-02-13 15:13:51 -0800364 : type),
365 perspective_(session->perspective()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500366 if (type_ == WRITE_UNIDIRECTIONAL) {
fayang3a51d1a2020-04-16 13:42:08 -0700367 fin_received_ = true;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500368 CloseReadSide();
369 } else if (type_ == READ_UNIDIRECTIONAL) {
fayang3a51d1a2020-04-16 13:42:08 -0700370 fin_sent_ = true;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500371 CloseWriteSide();
372 }
nharperd5c4a932019-05-13 13:58:49 -0700373 if (type_ != CRYPTO) {
renjietang35e49ed2020-02-19 10:55:01 -0800374 stream_delegate_->RegisterStreamPriority(id, is_static_, precedence_);
nharperd5c4a932019-05-13 13:58:49 -0700375 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500376}
377
378QuicStream::~QuicStream() {
379 if (session_ != nullptr && IsWaitingForAcks()) {
380 QUIC_DVLOG(1)
381 << ENDPOINT << "Stream " << id_
382 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
383 << send_buffer_.stream_bytes_outstanding()
384 << ", fin_outstanding: " << fin_outstanding_;
385 }
renjietang35e49ed2020-02-19 10:55:01 -0800386 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
387 stream_delegate_->UnregisterStreamPriority(id(), is_static_);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500388 }
389}
390
QUICHE teama6ef0a62019-03-07 20:34:33 -0500391void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
392 DCHECK_EQ(frame.stream_id, id_);
393
394 DCHECK(!(read_side_closed_ && write_side_closed_));
395
renjietanged49cb92020-03-03 14:30:53 -0800396 if (frame.fin && is_static_) {
397 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
398 "Attempt to close a static stream");
399 return;
400 }
401
QUICHE teama6ef0a62019-03-07 20:34:33 -0500402 if (type_ == WRITE_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800403 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
404 "Data received on write unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500405 return;
406 }
407
408 bool is_stream_too_long =
409 (frame.offset > kMaxStreamLength) ||
410 (kMaxStreamLength - frame.offset < frame.data_length);
411 if (is_stream_too_long) {
412 // Close connection if stream becomes too long.
413 QUIC_PEER_BUG << "Receive stream frame on stream " << id_
414 << " reaches max stream length. frame offset " << frame.offset
415 << " length " << frame.data_length << ". "
416 << sequencer_.DebugString();
renjietang87df0d02020-02-13 11:53:52 -0800417 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500418 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800419 quiche::QuicheStrCat("Peer sends more data than allowed on stream ",
420 id_, ". frame: offset = ", frame.offset,
421 ", length = ", frame.data_length, ". ",
422 sequencer_.DebugString()));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500423 return;
424 }
renjietang963c2ec2019-09-12 11:46:50 -0700425
renjietang89aa73e2019-10-21 15:03:51 -0700426 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800427 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700428 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800429 quiche::QuicheStrCat(
renjietang15afba32019-10-23 14:32:35 -0700430 "Stream ", id_,
431 " received data with offset: ", frame.offset + frame.data_length,
432 ", which is beyond close offset: ", sequencer_.close_offset()));
renjietang89aa73e2019-10-21 15:03:51 -0700433 return;
renjietang963c2ec2019-09-12 11:46:50 -0700434 }
435
fayang6dfe58d2020-06-01 08:02:47 -0700436 if (frame.fin && !fin_received_) {
437 fin_received_ = true;
438 if (fin_sent_) {
439 DCHECK(!was_draining_);
440 session_->StreamDraining(id_,
441 /*unidirectional=*/type_ != BIDIRECTIONAL);
442 was_draining_ = true;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500443 }
444 }
445
446 if (read_side_closed_) {
447 QUIC_DLOG(INFO)
448 << ENDPOINT << "Stream " << frame.stream_id
449 << " is closed for reading. Ignoring newly received stream data.";
450 // The subclass does not want to read data: blackhole the data.
451 return;
452 }
453
454 // This count includes duplicate data received.
dschinazif1e7b422020-04-30 12:21:28 -0700455 QuicByteCount frame_payload_size = frame.data_length;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500456 stream_bytes_read_ += frame_payload_size;
457
458 // Flow control is interested in tracking highest received offset.
459 // Only interested in received frames that carry data.
460 if (frame_payload_size > 0 &&
461 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
462 // As the highest received offset has changed, check to see if this is a
463 // violation of flow control.
dschinazi7ccff582020-05-13 10:24:39 -0700464 QUIC_BUG_IF(!flow_controller_.has_value())
465 << ENDPOINT << "OnStreamFrame called on stream without flow control";
466 if ((flow_controller_.has_value() &&
467 flow_controller_->FlowControlViolation()) ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500468 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800469 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
470 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500471 return;
472 }
473 }
474
475 sequencer_.OnStreamFrame(frame);
476}
477
renjietanged49cb92020-03-03 14:30:53 -0800478bool QuicStream::OnStopSending(uint16_t code) {
479 // Do not reset the stream if all data has been sent and acknowledged.
480 if (write_side_closed() && !IsWaitingForAcks()) {
481 QUIC_DVLOG(1) << ENDPOINT
482 << "Ignoring STOP_SENDING for a write closed stream, id: "
483 << id_;
484 return false;
485 }
486
487 if (is_static_) {
488 QUIC_DVLOG(1) << ENDPOINT
489 << "Received STOP_SENDING for a static stream, id: " << id_
490 << " Closing connection";
491 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
492 "Received STOP_SENDING for a static stream");
493 return false;
494 }
495
496 stream_error_ = static_cast<QuicRstStreamErrorCode>(code);
renjietang9946bc02020-07-16 15:14:27 -0700497
498 session()->SendRstStream(id(),
499 static_cast<quic::QuicRstStreamErrorCode>(code),
500 stream_bytes_written(), /*send_rst_only = */ true);
501 rst_sent_ = true;
502 CloseWriteSide();
renjietanged49cb92020-03-03 14:30:53 -0800503 return true;
504}
505
QUICHE teama6ef0a62019-03-07 20:34:33 -0500506int QuicStream::num_frames_received() const {
507 return sequencer_.num_frames_received();
508}
509
510int QuicStream::num_duplicate_frames_received() const {
511 return sequencer_.num_duplicate_frames_received();
512}
513
514void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
515 rst_received_ = true;
516 if (frame.byte_offset > kMaxStreamLength) {
517 // Peer are not suppose to write bytes more than maxium allowed.
renjietang87df0d02020-02-13 11:53:52 -0800518 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
519 "Reset frame stream offset overflow.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500520 return;
521 }
renjietang15afba32019-10-23 14:32:35 -0700522
renjietang7ab48c32019-12-09 15:40:31 -0800523 const QuicStreamOffset kMaxOffset =
524 std::numeric_limits<QuicStreamOffset>::max();
525 if (sequencer()->close_offset() != kMaxOffset &&
526 frame.byte_offset != sequencer()->close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800527 OnUnrecoverableError(
renjietang7ab48c32019-12-09 15:40:31 -0800528 QUIC_STREAM_MULTIPLE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800529 quiche::QuicheStrCat("Stream ", id_,
530 " received new final offset: ", frame.byte_offset,
531 ", which is different from close offset: ",
532 sequencer_.close_offset()));
renjietang7ab48c32019-12-09 15:40:31 -0800533 return;
renjietang15afba32019-10-23 14:32:35 -0700534 }
535
QUICHE teama6ef0a62019-03-07 20:34:33 -0500536 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
dschinazi7ccff582020-05-13 10:24:39 -0700537 QUIC_BUG_IF(!flow_controller_.has_value())
538 << ENDPOINT << "OnStreamReset called on stream without flow control";
539 if ((flow_controller_.has_value() &&
540 flow_controller_->FlowControlViolation()) ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500541 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800542 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
543 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500544 return;
545 }
546
547 stream_error_ = frame.error_code;
548 // Google QUIC closes both sides of the stream in response to a
549 // RESET_STREAM, IETF QUIC closes only the read side.
fkastenholz305e1732019-06-18 05:01:22 -0700550 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500551 CloseWriteSide();
552 }
553 CloseReadSide();
554}
555
556void QuicStream::OnConnectionClosed(QuicErrorCode error,
557 ConnectionCloseSource /*source*/) {
558 if (read_side_closed_ && write_side_closed_) {
559 return;
560 }
561 if (error != QUIC_NO_ERROR) {
562 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
563 connection_error_ = error;
564 }
565
566 CloseWriteSide();
567 CloseReadSide();
568}
569
570void QuicStream::OnFinRead() {
571 DCHECK(sequencer_.IsClosed());
572 // OnFinRead can be called due to a FIN flag in a headers block, so there may
573 // have been no OnStreamFrame call with a FIN in the frame.
574 fin_received_ = true;
575 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
576 // stream will be destroyed by CloseReadSide, so don't need to call
577 // StreamDraining.
578 CloseReadSide();
579}
580
fayang3a51d1a2020-04-16 13:42:08 -0700581void QuicStream::SetFinSent() {
582 DCHECK(!VersionUsesHttp3(transport_version()));
583 fin_sent_ = true;
584}
585
QUICHE teama6ef0a62019-03-07 20:34:33 -0500586void QuicStream::Reset(QuicRstStreamErrorCode error) {
587 stream_error_ = error;
renjietang9946bc02020-07-16 15:14:27 -0700588 session()->SendRstStream(id(), error, stream_bytes_written(),
589 /*send_rst_only = */ false);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500590 rst_sent_ = true;
fayangb8f83442020-06-01 12:09:17 -0700591 if (read_side_closed_ && write_side_closed_ && !IsWaitingForAcks()) {
592 session()->OnStreamDoneWaitingForAcks(id_);
593 return;
fayangd62ea772020-04-17 06:32:16 -0700594 }
fayangb8f83442020-06-01 12:09:17 -0700595 CloseReadSide();
596 CloseWriteSide();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500597}
598
renjietang87df0d02020-02-13 11:53:52 -0800599void QuicStream::OnUnrecoverableError(QuicErrorCode error,
600 const std::string& details) {
renjietangf196f6a2020-02-12 12:34:23 -0800601 stream_delegate_->OnStreamError(error, details);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500602}
603
fayang476683a2019-07-25 12:42:16 -0700604const spdy::SpdyStreamPrecedence& QuicStream::precedence() const {
605 return precedence_;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500606}
607
fayang476683a2019-07-25 12:42:16 -0700608void QuicStream::SetPriority(const spdy::SpdyStreamPrecedence& precedence) {
609 precedence_ = precedence;
bnccf09f952020-01-30 17:35:59 -0800610
611 MaybeSendPriorityUpdateFrame();
612
renjietang35e49ed2020-02-19 10:55:01 -0800613 stream_delegate_->UpdateStreamPriority(id(), precedence);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500614}
615
616void QuicStream::WriteOrBufferData(
dmcardlecf0bfcf2019-12-13 08:08:21 -0800617 quiche::QuicheStringPiece data,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500618 bool fin,
619 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
620 if (data.empty() && !fin) {
621 QUIC_BUG << "data.empty() && !fin";
622 return;
623 }
624
625 if (fin_buffered_) {
626 QUIC_BUG << "Fin already buffered";
627 return;
628 }
629 if (write_side_closed_) {
630 QUIC_DLOG(ERROR) << ENDPOINT
631 << "Attempt to write when the write side is closed";
632 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800633 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
634 "Try to send data on read unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500635 }
636 return;
637 }
638
QUICHE teama6ef0a62019-03-07 20:34:33 -0500639 fin_buffered_ = fin;
640
641 bool had_buffered_data = HasBufferedData();
642 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
643 // all data to be consumed.
644 if (data.length() > 0) {
645 struct iovec iov(QuicUtils::MakeIovec(data));
646 QuicStreamOffset offset = send_buffer_.stream_offset();
647 if (kMaxStreamLength - offset < data.length()) {
648 QUIC_BUG << "Write too many data via stream " << id_;
renjietang87df0d02020-02-13 11:53:52 -0800649 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500650 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800651 quiche::QuicheStrCat("Write too many data via stream ", id_));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500652 return;
653 }
654 send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
655 OnDataBuffered(offset, data.length(), ack_listener);
656 }
657 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
658 // Write data if there is no buffered data before.
659 WriteBufferedData();
660 }
661}
662
663void QuicStream::OnCanWrite() {
664 if (HasDeadlinePassed()) {
665 OnDeadlinePassed();
666 return;
667 }
668 if (HasPendingRetransmission()) {
669 WritePendingRetransmission();
670 // Exit early to allow other streams to write pending retransmissions if
671 // any.
672 return;
673 }
674
675 if (write_side_closed_) {
676 QUIC_DLOG(ERROR)
677 << ENDPOINT << "Stream " << id()
678 << " attempting to write new data when the write side is closed";
679 return;
680 }
681 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
682 WriteBufferedData();
683 }
684 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
685 // Notify upper layer to write new data when buffered data size is below
686 // low water mark.
687 OnCanWriteNewData();
688 }
689}
690
691void QuicStream::MaybeSendBlocked() {
dschinazi7ccff582020-05-13 10:24:39 -0700692 if (!flow_controller_.has_value()) {
693 QUIC_BUG << ENDPOINT
694 << "MaybeSendBlocked called on stream without flow control";
695 return;
696 }
nharperd5c4a932019-05-13 13:58:49 -0700697 if (flow_controller_->ShouldSendBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500698 session_->SendBlocked(id_);
699 }
700 if (!stream_contributes_to_connection_flow_control_) {
701 return;
702 }
703 if (connection_flow_controller_->ShouldSendBlocked()) {
704 session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
705 }
706 // If the stream is blocked by connection-level flow control but not by
707 // stream-level flow control, add the stream to the write blocked list so that
708 // the stream will be given a chance to write when a connection-level
709 // WINDOW_UPDATE arrives.
710 if (connection_flow_controller_->IsBlocked() &&
nharperd5c4a932019-05-13 13:58:49 -0700711 !flow_controller_->IsBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500712 session_->MarkConnectionLevelWriteBlocked(id());
713 }
714}
715
QUICHE teama6ef0a62019-03-07 20:34:33 -0500716QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
717 QuicConsumedData consumed_data(0, false);
718 if (span.empty() && !fin) {
719 QUIC_BUG << "span.empty() && !fin";
720 return consumed_data;
721 }
722
723 if (fin_buffered_) {
724 QUIC_BUG << "Fin already buffered";
725 return consumed_data;
726 }
727
728 if (write_side_closed_) {
729 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
zhongyi4dc99c02019-05-30 14:11:04 -0700730 << " attempting to write when the write side is closed";
QUICHE teama6ef0a62019-03-07 20:34:33 -0500731 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800732 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
733 "Try to send data on read unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500734 }
735 return consumed_data;
736 }
737
738 bool had_buffered_data = HasBufferedData();
739 if (CanWriteNewData() || span.empty()) {
740 consumed_data.fin_consumed = fin;
741 if (!span.empty()) {
742 // Buffer all data if buffered data size is below limit.
743 QuicStreamOffset offset = send_buffer_.stream_offset();
wub553a9662019-03-28 20:13:23 -0700744 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500745 if (offset > send_buffer_.stream_offset() ||
746 kMaxStreamLength < send_buffer_.stream_offset()) {
747 QUIC_BUG << "Write too many data via stream " << id_;
renjietang87df0d02020-02-13 11:53:52 -0800748 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500749 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800750 quiche::QuicheStrCat("Write too many data via stream ", id_));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500751 return consumed_data;
752 }
753 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
754 }
755 }
756 fin_buffered_ = consumed_data.fin_consumed;
757
758 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
759 // Write data if there is no buffered data before.
760 WriteBufferedData();
761 }
762
763 return consumed_data;
764}
765
766bool QuicStream::HasPendingRetransmission() const {
767 return send_buffer_.HasPendingRetransmission() || fin_lost_;
768}
769
770bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
771 QuicByteCount data_length,
772 bool fin) const {
773 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
774 (fin && fin_outstanding_);
775}
776
QUICHE teama6ef0a62019-03-07 20:34:33 -0500777void QuicStream::CloseReadSide() {
778 if (read_side_closed_) {
779 return;
780 }
781 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
782
783 read_side_closed_ = true;
784 sequencer_.ReleaseBuffer();
785
786 if (write_side_closed_) {
787 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
fayangb8f83442020-06-01 12:09:17 -0700788 session_->OnStreamClosed(id());
789 OnClose();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500790 }
791}
792
793void QuicStream::CloseWriteSide() {
794 if (write_side_closed_) {
795 return;
796 }
797 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
798
799 write_side_closed_ = true;
800 if (read_side_closed_) {
801 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
fayangb8f83442020-06-01 12:09:17 -0700802 session_->OnStreamClosed(id());
803 OnClose();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500804 }
805}
806
807bool QuicStream::HasBufferedData() const {
808 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
809 return send_buffer_.stream_offset() > stream_bytes_written();
810}
811
812QuicTransportVersion QuicStream::transport_version() const {
renjietangd1d00852019-09-06 10:43:12 -0700813 return session_->transport_version();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500814}
815
816HandshakeProtocol QuicStream::handshake_protocol() const {
817 return session_->connection()->version().handshake_protocol;
818}
819
820void QuicStream::StopReading() {
821 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
822 sequencer_.StopReading();
823}
824
QUICHE teama6ef0a62019-03-07 20:34:33 -0500825void QuicStream::OnClose() {
fayangb8f83442020-06-01 12:09:17 -0700826 DCHECK(read_side_closed_ && write_side_closed_);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500827
828 if (!fin_sent_ && !rst_sent_) {
829 // For flow control accounting, tell the peer how many bytes have been
830 // written on this stream before termination. Done here if needed, using a
831 // RST_STREAM frame.
832 QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
833 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
renjietang9946bc02020-07-16 15:14:27 -0700834 stream_bytes_written(), /*send_rst_only = */ false);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500835 session_->OnStreamDoneWaitingForAcks(id_);
836 rst_sent_ = true;
837 }
838
dschinazi7ccff582020-05-13 10:24:39 -0700839 if (!flow_controller_.has_value() ||
840 flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500841 connection_flow_controller_->FlowControlViolation()) {
842 return;
843 }
844 // The stream is being closed and will not process any further incoming bytes.
845 // As there may be more bytes in flight, to ensure that both endpoints have
846 // the same connection level flow control state, mark all unreceived or
847 // buffered bytes as consumed.
848 QuicByteCount bytes_to_consume =
nharperd5c4a932019-05-13 13:58:49 -0700849 flow_controller_->highest_received_byte_offset() -
850 flow_controller_->bytes_consumed();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500851 AddBytesConsumed(bytes_to_consume);
852}
853
854void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
renjietang30246322019-09-09 11:24:10 -0700855 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800856 OnUnrecoverableError(
renjietang28c04b72019-07-01 15:08:09 -0700857 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
858 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
859 return;
860 }
861
dschinazi7ccff582020-05-13 10:24:39 -0700862 if (!flow_controller_.has_value()) {
863 QUIC_BUG << ENDPOINT
864 << "OnWindowUpdateFrame called on stream without flow control";
865 return;
866 }
867
renjietangd088eab2019-11-21 14:54:41 -0800868 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500869 // Let session unblock this stream.
870 session_->MarkConnectionLevelWriteBlocked(id_);
871 }
872}
873
874bool QuicStream::MaybeIncreaseHighestReceivedOffset(
875 QuicStreamOffset new_offset) {
dschinazi7ccff582020-05-13 10:24:39 -0700876 if (!flow_controller_.has_value()) {
877 QUIC_BUG << ENDPOINT
878 << "MaybeIncreaseHighestReceivedOffset called on stream without "
879 "flow control";
880 return false;
881 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500882 uint64_t increment =
nharperd5c4a932019-05-13 13:58:49 -0700883 new_offset - flow_controller_->highest_received_byte_offset();
884 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500885 return false;
886 }
887
888 // If |new_offset| increased the stream flow controller's highest received
889 // offset, increase the connection flow controller's value by the incremental
890 // difference.
891 if (stream_contributes_to_connection_flow_control_) {
892 connection_flow_controller_->UpdateHighestReceivedOffset(
893 connection_flow_controller_->highest_received_byte_offset() +
894 increment);
895 }
896 return true;
897}
898
899void QuicStream::AddBytesSent(QuicByteCount bytes) {
dschinazi7ccff582020-05-13 10:24:39 -0700900 if (!flow_controller_.has_value()) {
901 QUIC_BUG << ENDPOINT
902 << "AddBytesSent called on stream without flow control";
903 return;
904 }
nharperd5c4a932019-05-13 13:58:49 -0700905 flow_controller_->AddBytesSent(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500906 if (stream_contributes_to_connection_flow_control_) {
907 connection_flow_controller_->AddBytesSent(bytes);
908 }
909}
910
911void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700912 if (type_ == CRYPTO) {
913 // A stream with type CRYPTO has no flow control, so there's nothing this
914 // function needs to do. This function still gets called by the
915 // QuicStreamSequencers used by QuicCryptoStream.
916 return;
917 }
dschinazi7ccff582020-05-13 10:24:39 -0700918 if (!flow_controller_.has_value()) {
919 QUIC_BUG
920 << ENDPOINT
921 << "AddBytesConsumed called on non-crypto stream without flow control";
922 return;
923 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500924 // Only adjust stream level flow controller if still reading.
925 if (!read_side_closed_) {
nharperd5c4a932019-05-13 13:58:49 -0700926 flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500927 }
928
929 if (stream_contributes_to_connection_flow_control_) {
930 connection_flow_controller_->AddBytesConsumed(bytes);
931 }
932}
933
renjietange0f96d72020-04-28 11:25:29 -0700934bool QuicStream::ConfigSendWindowOffset(QuicStreamOffset new_offset) {
dschinazi7ccff582020-05-13 10:24:39 -0700935 if (!flow_controller_.has_value()) {
936 QUIC_BUG << ENDPOINT
937 << "ConfigSendWindowOffset called on stream without flow control";
938 return false;
939 }
renjietanga5a2ca92020-06-15 13:19:36 -0700940
941 QUIC_BUG_IF(session()->version().AllowsLowFlowControlLimits() &&
942 new_offset < flow_controller_->send_window_offset())
943 << ENDPOINT << "The new offset " << new_offset
944 << " decreases current offset " << flow_controller_->send_window_offset();
renjietange0f96d72020-04-28 11:25:29 -0700945 if (flow_controller_->UpdateSendWindowOffset(new_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500946 // Let session unblock this stream.
947 session_->MarkConnectionLevelWriteBlocked(id_);
948 }
renjietange0f96d72020-04-28 11:25:29 -0700949 return true;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500950}
951
952void QuicStream::AddRandomPaddingAfterFin() {
953 add_random_padding_after_fin_ = true;
954}
955
956bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
957 QuicByteCount data_length,
958 bool fin_acked,
dschinazi17d42422019-06-18 16:35:07 -0700959 QuicTime::Delta /*ack_delay_time*/,
QUICHE team2f5f30b2020-02-18 08:52:28 -0800960 QuicTime /*receive_timestamp*/,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500961 QuicByteCount* newly_acked_length) {
962 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
963 << "[" << offset << ", " << offset + data_length << "]"
964 << " fin = " << fin_acked;
965 *newly_acked_length = 0;
966 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
967 newly_acked_length)) {
renjietang87df0d02020-02-13 11:53:52 -0800968 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500969 return false;
970 }
971 if (!fin_sent_ && fin_acked) {
renjietang87df0d02020-02-13 11:53:52 -0800972 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500973 return false;
974 }
975 // Indicates whether ack listener's OnPacketAcked should be called.
976 const bool new_data_acked =
977 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
978 if (fin_acked) {
979 fin_outstanding_ = false;
980 fin_lost_ = false;
981 }
renjietang647b3cf2020-08-04 13:23:12 -0700982 if (!IsWaitingForAcks() && (!session()->remove_zombie_streams() ||
983 (read_side_closed_ && write_side_closed_))) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500984 session_->OnStreamDoneWaitingForAcks(id_);
985 }
986 return new_data_acked;
987}
988
989void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
990 QuicByteCount data_length,
991 bool fin_retransmitted) {
992 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
993 if (fin_retransmitted) {
994 fin_lost_ = false;
995 }
996}
997
998void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
999 QuicByteCount data_length,
1000 bool fin_lost) {
1001 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
1002 << "[" << offset << ", " << offset + data_length << "]"
1003 << " fin = " << fin_lost;
1004 if (data_length > 0) {
1005 send_buffer_.OnStreamDataLost(offset, data_length);
1006 }
1007 if (fin_lost && fin_outstanding_) {
1008 fin_lost_ = true;
1009 }
1010}
1011
1012bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
1013 QuicByteCount data_length,
renjietang4d992bf2020-03-03 13:01:55 -08001014 bool fin,
1015 TransmissionType type) {
1016 DCHECK(type == PTO_RETRANSMISSION || type == RTO_RETRANSMISSION ||
1017 type == TLP_RETRANSMISSION || type == PROBING_RETRANSMISSION);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001018 if (HasDeadlinePassed()) {
1019 OnDeadlinePassed();
1020 return true;
1021 }
1022 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
1023 offset + data_length);
1024 retransmission.Difference(bytes_acked());
1025 bool retransmit_fin = fin && fin_outstanding_;
1026 if (retransmission.Empty() && !retransmit_fin) {
1027 return true;
1028 }
1029 QuicConsumedData consumed(0, false);
1030 for (const auto& interval : retransmission) {
1031 QuicStreamOffset retransmission_offset = interval.min();
1032 QuicByteCount retransmission_length = interval.max() - interval.min();
1033 const bool can_bundle_fin =
1034 retransmit_fin && (retransmission_offset + retransmission_length ==
1035 stream_bytes_written());
renjietang7c239172020-02-21 13:50:39 -08001036 consumed = stream_delegate_->WritevData(
1037 id_, retransmission_length, retransmission_offset,
vasilvv9efbb912020-06-05 08:59:26 -07001038 can_bundle_fin ? FIN : NO_FIN, type, QUICHE_NULLOPT);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001039 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1040 << " is forced to retransmit stream data ["
1041 << retransmission_offset << ", "
1042 << retransmission_offset + retransmission_length
1043 << ") and fin: " << can_bundle_fin
1044 << ", consumed: " << consumed;
1045 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
1046 consumed.fin_consumed);
1047 if (can_bundle_fin) {
1048 retransmit_fin = !consumed.fin_consumed;
1049 }
1050 if (consumed.bytes_consumed < retransmission_length ||
1051 (can_bundle_fin && !consumed.fin_consumed)) {
1052 // Connection is write blocked.
1053 return false;
1054 }
1055 }
1056 if (retransmit_fin) {
1057 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1058 << " retransmits fin only frame.";
renjietang4d992bf2020-03-03 13:01:55 -08001059 consumed = stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
vasilvv9efbb912020-06-05 08:59:26 -07001060 type, QUICHE_NULLOPT);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001061 if (!consumed.fin_consumed) {
1062 return false;
1063 }
1064 }
1065 return true;
1066}
1067
1068bool QuicStream::IsWaitingForAcks() const {
1069 return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
1070 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1071}
1072
dschinazif1e7b422020-04-30 12:21:28 -07001073QuicByteCount QuicStream::ReadableBytes() const {
QUICHE teama6ef0a62019-03-07 20:34:33 -05001074 return sequencer_.ReadableBytes();
1075}
1076
1077bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1078 QuicByteCount data_length,
1079 QuicDataWriter* writer) {
1080 DCHECK_LT(0u, data_length);
1081 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1082 << offset << " length " << data_length;
1083 return send_buffer_.WriteStreamData(offset, data_length, writer);
1084}
1085
1086void QuicStream::WriteBufferedData() {
1087 DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1088
1089 if (session_->ShouldYield(id())) {
1090 session_->MarkConnectionLevelWriteBlocked(id());
1091 return;
1092 }
1093
1094 // Size of buffered data.
dschinazif1e7b422020-04-30 12:21:28 -07001095 QuicByteCount write_length = BufferedDataBytes();
QUICHE teama6ef0a62019-03-07 20:34:33 -05001096
1097 // A FIN with zero data payload should not be flow control blocked.
1098 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1099
1100 bool fin = fin_buffered_;
1101
1102 // How much data flow control permits to be written.
dschinazi7ccff582020-05-13 10:24:39 -07001103 QuicByteCount send_window;
1104 if (flow_controller_.has_value()) {
1105 send_window = flow_controller_->SendWindowSize();
1106 } else {
1107 send_window = std::numeric_limits<QuicByteCount>::max();
1108 QUIC_BUG << ENDPOINT
1109 << "WriteBufferedData called on stream without flow control";
1110 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001111 if (stream_contributes_to_connection_flow_control_) {
1112 send_window =
1113 std::min(send_window, connection_flow_controller_->SendWindowSize());
1114 }
1115
1116 if (send_window == 0 && !fin_with_zero_data) {
1117 // Quick return if nothing can be sent.
1118 MaybeSendBlocked();
1119 return;
1120 }
1121
1122 if (write_length > send_window) {
1123 // Don't send the FIN unless all the data will be sent.
1124 fin = false;
1125
1126 // Writing more data would be a violation of flow control.
dschinazif1e7b422020-04-30 12:21:28 -07001127 write_length = send_window;
QUICHE teama6ef0a62019-03-07 20:34:33 -05001128 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1129 << write_length << " due to flow control";
1130 }
fayang97857352019-07-01 06:15:26 -07001131
1132 StreamSendingState state = fin ? FIN : NO_FIN;
1133 if (fin && add_random_padding_after_fin_) {
1134 state = FIN_AND_PADDING;
1135 }
renjietang4d992bf2020-03-03 13:01:55 -08001136 QuicConsumedData consumed_data =
1137 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
vasilvv9efbb912020-06-05 08:59:26 -07001138 state, NOT_RETRANSMISSION, QUICHE_NULLOPT);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001139
1140 OnStreamDataConsumed(consumed_data.bytes_consumed);
1141
1142 AddBytesSent(consumed_data.bytes_consumed);
1143 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1144 << stream_bytes_written() << " bytes "
1145 << " and has buffered data " << BufferedDataBytes() << " bytes."
1146 << " fin is sent: " << consumed_data.fin_consumed
1147 << " fin is buffered: " << fin_buffered_;
1148
1149 // The write may have generated a write error causing this stream to be
1150 // closed. If so, simply return without marking the stream write blocked.
1151 if (write_side_closed_) {
1152 return;
1153 }
1154
1155 if (consumed_data.bytes_consumed == write_length) {
1156 if (!fin_with_zero_data) {
1157 MaybeSendBlocked();
1158 }
1159 if (fin && consumed_data.fin_consumed) {
fayangbe6d6642020-04-16 14:15:34 -07001160 DCHECK(!fin_sent_);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001161 fin_sent_ = true;
1162 fin_outstanding_ = true;
1163 if (fin_received_) {
fayangbe6d6642020-04-16 14:15:34 -07001164 DCHECK(!was_draining_);
1165 session_->StreamDraining(id_,
1166 /*unidirectional=*/type_ != BIDIRECTIONAL);
1167 was_draining_ = true;
QUICHE teama6ef0a62019-03-07 20:34:33 -05001168 }
1169 CloseWriteSide();
1170 } else if (fin && !consumed_data.fin_consumed) {
1171 session_->MarkConnectionLevelWriteBlocked(id());
1172 }
1173 } else {
1174 session_->MarkConnectionLevelWriteBlocked(id());
1175 }
1176 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1177 busy_counter_ = 0;
1178 }
zhongyi1b2f7832019-06-14 13:31:34 -07001179
1180 if (IsWaitingForAcks()) {
1181 session_->OnStreamWaitingForAcks(id_);
1182 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001183}
1184
1185uint64_t QuicStream::BufferedDataBytes() const {
1186 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1187 return send_buffer_.stream_offset() - stream_bytes_written();
1188}
1189
1190bool QuicStream::CanWriteNewData() const {
1191 return BufferedDataBytes() < buffered_data_threshold_;
1192}
1193
1194bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1195 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1196}
1197
1198uint64_t QuicStream::stream_bytes_written() const {
1199 return send_buffer_.stream_bytes_written();
1200}
1201
1202const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1203 return send_buffer_.bytes_acked();
1204}
1205
dschinazif1e7b422020-04-30 12:21:28 -07001206void QuicStream::OnStreamDataConsumed(QuicByteCount bytes_consumed) {
QUICHE teama6ef0a62019-03-07 20:34:33 -05001207 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1208}
1209
1210void QuicStream::WritePendingRetransmission() {
1211 while (HasPendingRetransmission()) {
1212 QuicConsumedData consumed(0, false);
1213 if (!send_buffer_.HasPendingRetransmission()) {
1214 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1215 << " retransmits fin only frame.";
renjietang4d992bf2020-03-03 13:01:55 -08001216 consumed =
1217 stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
vasilvv9efbb912020-06-05 08:59:26 -07001218 LOSS_RETRANSMISSION, QUICHE_NULLOPT);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001219 fin_lost_ = !consumed.fin_consumed;
1220 if (fin_lost_) {
1221 // Connection is write blocked.
1222 return;
1223 }
1224 } else {
1225 StreamPendingRetransmission pending =
1226 send_buffer_.NextPendingRetransmission();
1227 // Determine whether the lost fin can be bundled with the data.
1228 const bool can_bundle_fin =
1229 fin_lost_ &&
1230 (pending.offset + pending.length == stream_bytes_written());
renjietang7c239172020-02-21 13:50:39 -08001231 consumed = stream_delegate_->WritevData(
1232 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
vasilvv9efbb912020-06-05 08:59:26 -07001233 LOSS_RETRANSMISSION, QUICHE_NULLOPT);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001234 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1235 << " tries to retransmit stream data [" << pending.offset
1236 << ", " << pending.offset + pending.length
1237 << ") and fin: " << can_bundle_fin
1238 << ", consumed: " << consumed;
1239 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1240 consumed.fin_consumed);
1241 if (consumed.bytes_consumed < pending.length ||
1242 (can_bundle_fin && !consumed.fin_consumed)) {
1243 // Connection is write blocked.
1244 return;
1245 }
1246 }
1247 }
1248}
1249
1250bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1251 if (is_static_) {
1252 QUIC_BUG << "Cannot set TTL of a static stream.";
1253 return false;
1254 }
1255 if (deadline_.IsInitialized()) {
1256 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1257 return false;
1258 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001259 QuicTime now = session()->connection()->clock()->ApproximateNow();
1260 deadline_ = now + ttl;
1261 return true;
1262}
1263
1264bool QuicStream::HasDeadlinePassed() const {
1265 if (!deadline_.IsInitialized()) {
1266 // No deadline has been set.
1267 return false;
1268 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001269 QuicTime now = session()->connection()->clock()->ApproximateNow();
1270 if (now < deadline_) {
1271 return false;
1272 }
1273 // TTL expired.
1274 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1275 return true;
1276}
1277
1278void QuicStream::OnDeadlinePassed() {
1279 Reset(QUIC_STREAM_TTL_EXPIRED);
1280}
1281
1282void QuicStream::SendStopSending(uint16_t code) {
fkastenholz305e1732019-06-18 05:01:22 -07001283 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -05001284 // If the connection is not version 99, do nothing.
1285 // Do not QUIC_BUG or anything; the application really does not need to know
1286 // what version the connection is in.
1287 return;
1288 }
1289 session_->SendStopSending(code, id_);
1290}
1291
renjietanga5a2ca92020-06-15 13:19:36 -07001292QuicFlowController* QuicStream::flow_controller() {
1293 if (flow_controller_.has_value()) {
1294 return &flow_controller_.value();
1295 }
1296 QUIC_BUG << "Trying to access non-existent flow controller.";
1297 return nullptr;
1298}
1299
1300const QuicFlowController* QuicStream::flow_controller() const {
1301 if (flow_controller_.has_value()) {
1302 return &flow_controller_.value();
1303 }
1304 QUIC_BUG << "Trying to access non-existent flow controller.";
1305 return nullptr;
1306}
1307
bnc5f202512020-02-01 18:43:02 -08001308// static
1309spdy::SpdyStreamPrecedence QuicStream::CalculateDefaultPriority(
1310 const QuicSession* session) {
1311 if (VersionUsesHttp3(session->transport_version())) {
1312 return spdy::SpdyStreamPrecedence(QuicStream::kDefaultUrgency);
1313 }
1314
1315 if (session->use_http2_priority_write_scheduler()) {
1316 return spdy::SpdyStreamPrecedence(0, spdy::kHttp2DefaultStreamWeight,
1317 false);
1318 }
1319
1320 return spdy::SpdyStreamPrecedence(QuicStream::kDefaultPriority);
1321}
1322
QUICHE teama6ef0a62019-03-07 20:34:33 -05001323} // namespace quic