blob: 86beaef0f7b2753f186bf3fcac302c5c33d11114 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream.h"
6
vasilvv872e7a32019-03-12 16:42:44 -07007#include <string>
8
renjietang963c2ec2019-09-12 11:46:50 -07009#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050010#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
11#include "net/third_party/quiche/src/quic/core/quic_session.h"
renjietang15afba32019-10-23 14:32:35 -070012#include "net/third_party/quiche/src/quic/core/quic_types.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050013#include "net/third_party/quiche/src/quic/core/quic_utils.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
renjietang41a1b412020-02-27 15:05:14 -080018#include "net/third_party/quiche/src/common/platform/api/quiche_optional.h"
dmcardlecf0bfcf2019-12-13 08:08:21 -080019#include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
20#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050021
dmcardle15db6882020-02-26 12:55:36 -080022using quiche::QuicheOptional;
QUICHE teama6ef0a62019-03-07 20:34:33 -050023using spdy::SpdyPriority;
24
25namespace quic {
26
27#define ENDPOINT \
renjietangff3d3a32020-02-13 15:13:51 -080028 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
QUICHE teama6ef0a62019-03-07 20:34:33 -050029
30namespace {
31
dschinazi18cdf132019-10-09 16:08:18 -070032size_t DefaultFlowControlWindow(ParsedQuicVersion version) {
33 if (!version.AllowsLowFlowControlLimits()) {
34 return kDefaultFlowControlSendWindow;
35 }
36 return 0;
37}
38
rchb0451852019-09-11 21:17:01 -070039size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session,
40 QuicStreamId stream_id) {
41 ParsedQuicVersion version = session->connection()->version();
42 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
43 return session->config()->GetInitialStreamFlowControlWindowToSend();
44 }
45
46 // Unidirectional streams (v99 only).
47 if (VersionHasIetfQuicFrames(version.transport_version) &&
48 !QuicUtils::IsBidirectionalStreamId(stream_id)) {
49 return session->config()
50 ->GetInitialMaxStreamDataBytesUnidirectionalToSend();
51 }
52
dschinazi18cdf132019-10-09 16:08:18 -070053 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
54 session->perspective())) {
rchb0451852019-09-11 21:17:01 -070055 return session->config()
56 ->GetInitialMaxStreamDataBytesOutgoingBidirectionalToSend();
57 }
58
59 return session->config()
60 ->GetInitialMaxStreamDataBytesIncomingBidirectionalToSend();
QUICHE teama6ef0a62019-03-07 20:34:33 -050061}
62
rchb0451852019-09-11 21:17:01 -070063size_t GetReceivedFlowControlWindow(QuicSession* session,
64 QuicStreamId stream_id) {
65 ParsedQuicVersion version = session->connection()->version();
66 if (version.handshake_protocol != PROTOCOL_TLS1_3) {
67 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
68 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
69 }
70
dschinazi18cdf132019-10-09 16:08:18 -070071 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070072 }
73
74 // Unidirectional streams (v99 only).
75 if (VersionHasIetfQuicFrames(version.transport_version) &&
76 !QuicUtils::IsBidirectionalStreamId(stream_id)) {
77 if (session->config()
78 ->HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
79 return session->config()
80 ->ReceivedInitialMaxStreamDataBytesUnidirectional();
81 }
dschinazi18cdf132019-10-09 16:08:18 -070082
83 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070084 }
85
dschinazi18cdf132019-10-09 16:08:18 -070086 if (QuicUtils::IsOutgoingStreamId(version, stream_id,
87 session->perspective())) {
rchb0451852019-09-11 21:17:01 -070088 if (session->config()
dschinazi18cdf132019-10-09 16:08:18 -070089 ->HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
rchb0451852019-09-11 21:17:01 -070090 return session->config()
dschinazi18cdf132019-10-09 16:08:18 -070091 ->ReceivedInitialMaxStreamDataBytesOutgoingBidirectional();
rchb0451852019-09-11 21:17:01 -070092 }
dschinazi18cdf132019-10-09 16:08:18 -070093
94 return DefaultFlowControlWindow(version);
rchb0451852019-09-11 21:17:01 -070095 }
96
97 if (session->config()
dschinazi18cdf132019-10-09 16:08:18 -070098 ->HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
rchb0451852019-09-11 21:17:01 -070099 return session->config()
dschinazi18cdf132019-10-09 16:08:18 -0700100 ->ReceivedInitialMaxStreamDataBytesIncomingBidirectional();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500101 }
102
dschinazi18cdf132019-10-09 16:08:18 -0700103 return DefaultFlowControlWindow(version);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500104}
105
106} // namespace
107
108// static
109const SpdyPriority QuicStream::kDefaultPriority;
110
bnc5f202512020-02-01 18:43:02 -0800111// static
112const int QuicStream::kDefaultUrgency;
113
QUICHE teama6ef0a62019-03-07 20:34:33 -0500114PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
115 : id_(id),
116 session_(session),
renjietangf196f6a2020-02-12 12:34:23 -0800117 stream_delegate_(session),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500118 stream_bytes_read_(0),
119 fin_received_(false),
120 connection_flow_controller_(session->flow_controller()),
121 flow_controller_(session,
122 id,
123 /*is_connection_flow_controller*/ false,
rchb0451852019-09-11 21:17:01 -0700124 GetReceivedFlowControlWindow(session, id),
125 GetInitialStreamFlowControlWindowToSend(session, id),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500126 kStreamReceiveWindowLimit,
127 session_->flow_controller()->auto_tune_receive_window(),
128 session_->flow_controller()),
129 sequencer_(this) {}
130
131void PendingStream::OnDataAvailable() {
bnc092d8212019-08-07 11:53:20 -0700132 // Data should be kept in the sequencer so that
133 // QuicSession::ProcessPendingStream() can read it.
QUICHE teama6ef0a62019-03-07 20:34:33 -0500134}
135
136void PendingStream::OnFinRead() {
bnc092d8212019-08-07 11:53:20 -0700137 DCHECK(sequencer_.IsClosed());
QUICHE teama6ef0a62019-03-07 20:34:33 -0500138}
139
140void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
renjietangbb1c4892019-05-24 15:58:44 -0700141 // It will be called when the metadata of the stream is consumed.
142 flow_controller_.AddBytesConsumed(bytes);
143 connection_flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500144}
145
renjietang4f732482019-10-24 14:48:23 -0700146void PendingStream::Reset(QuicRstStreamErrorCode /*error*/) {
147 // Currently PendingStream is only read-unidirectional. It shouldn't send
148 // Reset.
renjietang4f732482019-10-24 14:48:23 -0700149 QUIC_NOTREACHED();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500150}
151
renjietang87df0d02020-02-13 11:53:52 -0800152void PendingStream::OnUnrecoverableError(QuicErrorCode error,
153 const std::string& details) {
renjietangf196f6a2020-02-12 12:34:23 -0800154 stream_delegate_->OnStreamError(error, details);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500155}
156
157QuicStreamId PendingStream::id() const {
158 return id_;
159}
160
QUICHE teama6ef0a62019-03-07 20:34:33 -0500161void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
162 DCHECK_EQ(frame.stream_id, id_);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500163
164 bool is_stream_too_long =
165 (frame.offset > kMaxStreamLength) ||
166 (kMaxStreamLength - frame.offset < frame.data_length);
167 if (is_stream_too_long) {
168 // Close connection if stream becomes too long.
169 QUIC_PEER_BUG
170 << "Receive stream frame reaches max stream length. frame offset "
171 << frame.offset << " length " << frame.data_length;
renjietang87df0d02020-02-13 11:53:52 -0800172 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
173 "Peer sends more data than allowed on this stream.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500174 return;
175 }
176
renjietang89aa73e2019-10-21 15:03:51 -0700177 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800178 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700179 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800180 quiche::QuicheStrCat(
renjietang15afba32019-10-23 14:32:35 -0700181 "Stream ", id_,
182 " received data with offset: ", frame.offset + frame.data_length,
183 ", which is beyond close offset: ", sequencer()->close_offset()));
renjietang963c2ec2019-09-12 11:46:50 -0700184 return;
185 }
186
QUICHE teama6ef0a62019-03-07 20:34:33 -0500187 if (frame.fin) {
188 fin_received_ = true;
189 }
190
191 // This count includes duplicate data received.
192 size_t frame_payload_size = frame.data_length;
193 stream_bytes_read_ += frame_payload_size;
194
195 // Flow control is interested in tracking highest received offset.
196 // Only interested in received frames that carry data.
197 if (frame_payload_size > 0 &&
198 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
199 // As the highest received offset has changed, check to see if this is a
200 // violation of flow control.
201 if (flow_controller_.FlowControlViolation() ||
202 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800203 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
204 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500205 return;
206 }
207 }
208
209 sequencer_.OnStreamFrame(frame);
210}
211
212void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
213 DCHECK_EQ(frame.stream_id, id_);
214
215 if (frame.byte_offset > kMaxStreamLength) {
216 // Peer are not suppose to write bytes more than maxium allowed.
renjietang87df0d02020-02-13 11:53:52 -0800217 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
218 "Reset frame stream offset overflow.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500219 return;
220 }
renjietang15afba32019-10-23 14:32:35 -0700221
222 const QuicStreamOffset kMaxOffset =
223 std::numeric_limits<QuicStreamOffset>::max();
224 if (sequencer()->close_offset() != kMaxOffset &&
225 frame.byte_offset != sequencer()->close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800226 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700227 QUIC_STREAM_MULTIPLE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800228 quiche::QuicheStrCat("Stream ", id_,
229 " received new final offset: ", frame.byte_offset,
230 ", which is different from close offset: ",
231 sequencer()->close_offset()));
renjietang15afba32019-10-23 14:32:35 -0700232 return;
233 }
234
QUICHE teama6ef0a62019-03-07 20:34:33 -0500235 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
236 if (flow_controller_.FlowControlViolation() ||
237 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800238 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
239 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500240 return;
241 }
242}
243
244bool PendingStream::MaybeIncreaseHighestReceivedOffset(
245 QuicStreamOffset new_offset) {
246 uint64_t increment =
247 new_offset - flow_controller_.highest_received_byte_offset();
248 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
249 return false;
250 }
251
252 // If |new_offset| increased the stream flow controller's highest received
253 // offset, increase the connection flow controller's value by the incremental
254 // difference.
255 connection_flow_controller_->UpdateHighestReceivedOffset(
256 connection_flow_controller_->highest_received_byte_offset() + increment);
257 return true;
258}
259
renjietangbb1c4892019-05-24 15:58:44 -0700260void PendingStream::MarkConsumed(size_t num_bytes) {
261 sequencer_.MarkConsumed(num_bytes);
262}
263
bnc4ff60622019-08-09 18:55:45 -0700264void PendingStream::StopReading() {
265 QUIC_DVLOG(1) << "Stop reading from pending stream " << id();
266 sequencer_.StopReading();
267}
268
renjietangbaea59c2019-05-29 15:08:14 -0700269QuicStream::QuicStream(PendingStream* pending, StreamType type, bool is_static)
270 : QuicStream(pending->id_,
271 pending->session_,
272 std::move(pending->sequencer_),
renjietang35448992019-05-08 17:08:57 -0700273 is_static,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500274 type,
renjietangbaea59c2019-05-29 15:08:14 -0700275 pending->stream_bytes_read_,
276 pending->fin_received_,
277 std::move(pending->flow_controller_),
278 pending->connection_flow_controller_) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500279 sequencer_.set_stream(this);
280}
281
nharperd5c4a932019-05-13 13:58:49 -0700282namespace {
283
dmcardle15db6882020-02-26 12:55:36 -0800284QuicheOptional<QuicFlowController> FlowController(QuicStreamId id,
285 QuicSession* session,
286 StreamType type) {
nharperd5c4a932019-05-13 13:58:49 -0700287 if (type == CRYPTO) {
288 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
289 // it is using crypto frames instead of stream frames. The QuicCryptoStream
290 // doesn't have any flow control in that case, so we don't create a
291 // QuicFlowController for it.
dmcardle15db6882020-02-26 12:55:36 -0800292 return QuicheOptional<QuicFlowController>();
nharperd5c4a932019-05-13 13:58:49 -0700293 }
294 return QuicFlowController(
295 session, id,
296 /*is_connection_flow_controller*/ false,
rchb0451852019-09-11 21:17:01 -0700297 GetReceivedFlowControlWindow(session, id),
298 GetInitialStreamFlowControlWindowToSend(session, id),
nharperd5c4a932019-05-13 13:58:49 -0700299 kStreamReceiveWindowLimit,
300 session->flow_controller()->auto_tune_receive_window(),
301 session->flow_controller());
302}
303
304} // namespace
305
QUICHE teama6ef0a62019-03-07 20:34:33 -0500306QuicStream::QuicStream(QuicStreamId id,
307 QuicSession* session,
308 bool is_static,
309 StreamType type)
310 : QuicStream(id,
311 session,
312 QuicStreamSequencer(this),
313 is_static,
314 type,
315 0,
316 false,
nharperd5c4a932019-05-13 13:58:49 -0700317 FlowController(id, session, type),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500318 session->flow_controller()) {}
319
320QuicStream::QuicStream(QuicStreamId id,
321 QuicSession* session,
322 QuicStreamSequencer sequencer,
323 bool is_static,
324 StreamType type,
325 uint64_t stream_bytes_read,
326 bool fin_received,
dmcardle15db6882020-02-26 12:55:36 -0800327 QuicheOptional<QuicFlowController> flow_controller,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500328 QuicFlowController* connection_flow_controller)
329 : sequencer_(std::move(sequencer)),
330 id_(id),
331 session_(session),
renjietangf196f6a2020-02-12 12:34:23 -0800332 stream_delegate_(session),
bnc5f202512020-02-01 18:43:02 -0800333 precedence_(CalculateDefaultPriority(session)),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500334 stream_bytes_read_(stream_bytes_read),
335 stream_error_(QUIC_STREAM_NO_ERROR),
336 connection_error_(QUIC_NO_ERROR),
337 read_side_closed_(false),
338 write_side_closed_(false),
339 fin_buffered_(false),
340 fin_sent_(false),
341 fin_outstanding_(false),
342 fin_lost_(false),
343 fin_received_(fin_received),
344 rst_sent_(false),
345 rst_received_(false),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500346 flow_controller_(std::move(flow_controller)),
347 connection_flow_controller_(connection_flow_controller),
348 stream_contributes_to_connection_flow_control_(true),
349 busy_counter_(0),
350 add_random_padding_after_fin_(false),
351 send_buffer_(
352 session->connection()->helper()->GetStreamSendBufferAllocator()),
353 buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
354 is_static_(is_static),
355 deadline_(QuicTime::Zero()),
renjietangd1d00852019-09-06 10:43:12 -0700356 type_(VersionHasIetfQuicFrames(session->transport_version()) &&
nharperd5c4a932019-05-13 13:58:49 -0700357 type != CRYPTO
QUICHE teama6ef0a62019-03-07 20:34:33 -0500358 ? QuicUtils::GetStreamType(id_,
renjietangc9e80442019-11-06 17:24:37 -0800359 session->perspective(),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500360 session->IsIncomingStream(id_))
renjietangff3d3a32020-02-13 15:13:51 -0800361 : type),
362 perspective_(session->perspective()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500363 if (type_ == WRITE_UNIDIRECTIONAL) {
364 set_fin_received(true);
365 CloseReadSide();
366 } else if (type_ == READ_UNIDIRECTIONAL) {
367 set_fin_sent(true);
368 CloseWriteSide();
369 }
nharperd5c4a932019-05-13 13:58:49 -0700370 if (type_ != CRYPTO) {
renjietang35e49ed2020-02-19 10:55:01 -0800371 stream_delegate_->RegisterStreamPriority(id, is_static_, precedence_);
nharperd5c4a932019-05-13 13:58:49 -0700372 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500373}
374
375QuicStream::~QuicStream() {
376 if (session_ != nullptr && IsWaitingForAcks()) {
377 QUIC_DVLOG(1)
378 << ENDPOINT << "Stream " << id_
379 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
380 << send_buffer_.stream_bytes_outstanding()
381 << ", fin_outstanding: " << fin_outstanding_;
382 }
renjietang35e49ed2020-02-19 10:55:01 -0800383 if (stream_delegate_ != nullptr && type_ != CRYPTO) {
384 stream_delegate_->UnregisterStreamPriority(id(), is_static_);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500385 }
386}
387
QUICHE teama6ef0a62019-03-07 20:34:33 -0500388void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
389 DCHECK_EQ(frame.stream_id, id_);
390
391 DCHECK(!(read_side_closed_ && write_side_closed_));
392
renjietanged49cb92020-03-03 14:30:53 -0800393 if (frame.fin && is_static_) {
394 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
395 "Attempt to close a static stream");
396 return;
397 }
398
QUICHE teama6ef0a62019-03-07 20:34:33 -0500399 if (type_ == WRITE_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800400 OnUnrecoverableError(QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
401 "Data received on write unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500402 return;
403 }
404
405 bool is_stream_too_long =
406 (frame.offset > kMaxStreamLength) ||
407 (kMaxStreamLength - frame.offset < frame.data_length);
408 if (is_stream_too_long) {
409 // Close connection if stream becomes too long.
410 QUIC_PEER_BUG << "Receive stream frame on stream " << id_
411 << " reaches max stream length. frame offset " << frame.offset
412 << " length " << frame.data_length << ". "
413 << sequencer_.DebugString();
renjietang87df0d02020-02-13 11:53:52 -0800414 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500415 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800416 quiche::QuicheStrCat("Peer sends more data than allowed on stream ",
417 id_, ". frame: offset = ", frame.offset,
418 ", length = ", frame.data_length, ". ",
419 sequencer_.DebugString()));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500420 return;
421 }
renjietang963c2ec2019-09-12 11:46:50 -0700422
renjietang89aa73e2019-10-21 15:03:51 -0700423 if (frame.offset + frame.data_length > sequencer_.close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800424 OnUnrecoverableError(
renjietang15afba32019-10-23 14:32:35 -0700425 QUIC_STREAM_DATA_BEYOND_CLOSE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800426 quiche::QuicheStrCat(
renjietang15afba32019-10-23 14:32:35 -0700427 "Stream ", id_,
428 " received data with offset: ", frame.offset + frame.data_length,
429 ", which is beyond close offset: ", sequencer_.close_offset()));
renjietang89aa73e2019-10-21 15:03:51 -0700430 return;
renjietang963c2ec2019-09-12 11:46:50 -0700431 }
432
QUICHE teama6ef0a62019-03-07 20:34:33 -0500433 if (frame.fin) {
434 fin_received_ = true;
435 if (fin_sent_) {
436 session_->StreamDraining(id_);
437 }
438 }
439
440 if (read_side_closed_) {
441 QUIC_DLOG(INFO)
442 << ENDPOINT << "Stream " << frame.stream_id
443 << " is closed for reading. Ignoring newly received stream data.";
444 // The subclass does not want to read data: blackhole the data.
445 return;
446 }
447
448 // This count includes duplicate data received.
449 size_t frame_payload_size = frame.data_length;
450 stream_bytes_read_ += frame_payload_size;
451
452 // Flow control is interested in tracking highest received offset.
453 // Only interested in received frames that carry data.
454 if (frame_payload_size > 0 &&
455 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
456 // As the highest received offset has changed, check to see if this is a
457 // violation of flow control.
nharperd5c4a932019-05-13 13:58:49 -0700458 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500459 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800460 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
461 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500462 return;
463 }
464 }
465
466 sequencer_.OnStreamFrame(frame);
467}
468
renjietanged49cb92020-03-03 14:30:53 -0800469bool QuicStream::OnStopSending(uint16_t code) {
470 // Do not reset the stream if all data has been sent and acknowledged.
471 if (write_side_closed() && !IsWaitingForAcks()) {
472 QUIC_DVLOG(1) << ENDPOINT
473 << "Ignoring STOP_SENDING for a write closed stream, id: "
474 << id_;
475 return false;
476 }
477
478 if (is_static_) {
479 QUIC_DVLOG(1) << ENDPOINT
480 << "Received STOP_SENDING for a static stream, id: " << id_
481 << " Closing connection";
482 OnUnrecoverableError(QUIC_INVALID_STREAM_ID,
483 "Received STOP_SENDING for a static stream");
484 return false;
485 }
486
487 stream_error_ = static_cast<QuicRstStreamErrorCode>(code);
488 return true;
489}
490
QUICHE teama6ef0a62019-03-07 20:34:33 -0500491int QuicStream::num_frames_received() const {
492 return sequencer_.num_frames_received();
493}
494
495int QuicStream::num_duplicate_frames_received() const {
496 return sequencer_.num_duplicate_frames_received();
497}
498
499void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
500 rst_received_ = true;
501 if (frame.byte_offset > kMaxStreamLength) {
502 // Peer are not suppose to write bytes more than maxium allowed.
renjietang87df0d02020-02-13 11:53:52 -0800503 OnUnrecoverableError(QUIC_STREAM_LENGTH_OVERFLOW,
504 "Reset frame stream offset overflow.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500505 return;
506 }
renjietang15afba32019-10-23 14:32:35 -0700507
renjietang7ab48c32019-12-09 15:40:31 -0800508 const QuicStreamOffset kMaxOffset =
509 std::numeric_limits<QuicStreamOffset>::max();
510 if (sequencer()->close_offset() != kMaxOffset &&
511 frame.byte_offset != sequencer()->close_offset()) {
renjietang87df0d02020-02-13 11:53:52 -0800512 OnUnrecoverableError(
renjietang7ab48c32019-12-09 15:40:31 -0800513 QUIC_STREAM_MULTIPLE_OFFSET,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800514 quiche::QuicheStrCat("Stream ", id_,
515 " received new final offset: ", frame.byte_offset,
516 ", which is different from close offset: ",
517 sequencer_.close_offset()));
renjietang7ab48c32019-12-09 15:40:31 -0800518 return;
renjietang15afba32019-10-23 14:32:35 -0700519 }
520
QUICHE teama6ef0a62019-03-07 20:34:33 -0500521 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
nharperd5c4a932019-05-13 13:58:49 -0700522 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500523 connection_flow_controller_->FlowControlViolation()) {
renjietang87df0d02020-02-13 11:53:52 -0800524 OnUnrecoverableError(QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
525 "Flow control violation after increasing offset");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500526 return;
527 }
528
529 stream_error_ = frame.error_code;
530 // Google QUIC closes both sides of the stream in response to a
531 // RESET_STREAM, IETF QUIC closes only the read side.
fkastenholz305e1732019-06-18 05:01:22 -0700532 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500533 CloseWriteSide();
534 }
535 CloseReadSide();
536}
537
538void QuicStream::OnConnectionClosed(QuicErrorCode error,
539 ConnectionCloseSource /*source*/) {
540 if (read_side_closed_ && write_side_closed_) {
541 return;
542 }
543 if (error != QUIC_NO_ERROR) {
544 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
545 connection_error_ = error;
546 }
547
548 CloseWriteSide();
549 CloseReadSide();
550}
551
552void QuicStream::OnFinRead() {
553 DCHECK(sequencer_.IsClosed());
554 // OnFinRead can be called due to a FIN flag in a headers block, so there may
555 // have been no OnStreamFrame call with a FIN in the frame.
556 fin_received_ = true;
557 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
558 // stream will be destroyed by CloseReadSide, so don't need to call
559 // StreamDraining.
560 CloseReadSide();
561}
562
563void QuicStream::Reset(QuicRstStreamErrorCode error) {
564 stream_error_ = error;
565 // Sending a RstStream results in calling CloseStream.
566 session()->SendRstStream(id(), error, stream_bytes_written());
567 rst_sent_ = true;
568}
569
renjietang87df0d02020-02-13 11:53:52 -0800570void QuicStream::OnUnrecoverableError(QuicErrorCode error,
571 const std::string& details) {
renjietangf196f6a2020-02-12 12:34:23 -0800572 stream_delegate_->OnStreamError(error, details);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500573}
574
fayang476683a2019-07-25 12:42:16 -0700575const spdy::SpdyStreamPrecedence& QuicStream::precedence() const {
576 return precedence_;
QUICHE teama6ef0a62019-03-07 20:34:33 -0500577}
578
fayang476683a2019-07-25 12:42:16 -0700579void QuicStream::SetPriority(const spdy::SpdyStreamPrecedence& precedence) {
580 precedence_ = precedence;
bnccf09f952020-01-30 17:35:59 -0800581
582 MaybeSendPriorityUpdateFrame();
583
renjietang35e49ed2020-02-19 10:55:01 -0800584 stream_delegate_->UpdateStreamPriority(id(), precedence);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500585}
586
587void QuicStream::WriteOrBufferData(
dmcardlecf0bfcf2019-12-13 08:08:21 -0800588 quiche::QuicheStringPiece data,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500589 bool fin,
590 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
591 if (data.empty() && !fin) {
592 QUIC_BUG << "data.empty() && !fin";
593 return;
594 }
595
596 if (fin_buffered_) {
597 QUIC_BUG << "Fin already buffered";
598 return;
599 }
600 if (write_side_closed_) {
601 QUIC_DLOG(ERROR) << ENDPOINT
602 << "Attempt to write when the write side is closed";
603 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800604 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
605 "Try to send data on read unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500606 }
607 return;
608 }
609
QUICHE teama6ef0a62019-03-07 20:34:33 -0500610 fin_buffered_ = fin;
611
612 bool had_buffered_data = HasBufferedData();
613 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
614 // all data to be consumed.
615 if (data.length() > 0) {
616 struct iovec iov(QuicUtils::MakeIovec(data));
617 QuicStreamOffset offset = send_buffer_.stream_offset();
618 if (kMaxStreamLength - offset < data.length()) {
619 QUIC_BUG << "Write too many data via stream " << id_;
renjietang87df0d02020-02-13 11:53:52 -0800620 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500621 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800622 quiche::QuicheStrCat("Write too many data via stream ", id_));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500623 return;
624 }
625 send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
626 OnDataBuffered(offset, data.length(), ack_listener);
627 }
628 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
629 // Write data if there is no buffered data before.
630 WriteBufferedData();
631 }
632}
633
634void QuicStream::OnCanWrite() {
635 if (HasDeadlinePassed()) {
636 OnDeadlinePassed();
637 return;
638 }
639 if (HasPendingRetransmission()) {
640 WritePendingRetransmission();
641 // Exit early to allow other streams to write pending retransmissions if
642 // any.
643 return;
644 }
645
646 if (write_side_closed_) {
647 QUIC_DLOG(ERROR)
648 << ENDPOINT << "Stream " << id()
649 << " attempting to write new data when the write side is closed";
650 return;
651 }
652 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
653 WriteBufferedData();
654 }
655 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
656 // Notify upper layer to write new data when buffered data size is below
657 // low water mark.
658 OnCanWriteNewData();
659 }
660}
661
662void QuicStream::MaybeSendBlocked() {
nharperd5c4a932019-05-13 13:58:49 -0700663 if (flow_controller_->ShouldSendBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500664 session_->SendBlocked(id_);
665 }
666 if (!stream_contributes_to_connection_flow_control_) {
667 return;
668 }
669 if (connection_flow_controller_->ShouldSendBlocked()) {
670 session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
671 }
672 // If the stream is blocked by connection-level flow control but not by
673 // stream-level flow control, add the stream to the write blocked list so that
674 // the stream will be given a chance to write when a connection-level
675 // WINDOW_UPDATE arrives.
676 if (connection_flow_controller_->IsBlocked() &&
nharperd5c4a932019-05-13 13:58:49 -0700677 !flow_controller_->IsBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500678 session_->MarkConnectionLevelWriteBlocked(id());
679 }
680}
681
QUICHE teama6ef0a62019-03-07 20:34:33 -0500682QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
683 QuicConsumedData consumed_data(0, false);
684 if (span.empty() && !fin) {
685 QUIC_BUG << "span.empty() && !fin";
686 return consumed_data;
687 }
688
689 if (fin_buffered_) {
690 QUIC_BUG << "Fin already buffered";
691 return consumed_data;
692 }
693
694 if (write_side_closed_) {
695 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
zhongyi4dc99c02019-05-30 14:11:04 -0700696 << " attempting to write when the write side is closed";
QUICHE teama6ef0a62019-03-07 20:34:33 -0500697 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800698 OnUnrecoverableError(QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
699 "Try to send data on read unidirectional stream");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500700 }
701 return consumed_data;
702 }
703
704 bool had_buffered_data = HasBufferedData();
705 if (CanWriteNewData() || span.empty()) {
706 consumed_data.fin_consumed = fin;
707 if (!span.empty()) {
708 // Buffer all data if buffered data size is below limit.
709 QuicStreamOffset offset = send_buffer_.stream_offset();
wub553a9662019-03-28 20:13:23 -0700710 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500711 if (offset > send_buffer_.stream_offset() ||
712 kMaxStreamLength < send_buffer_.stream_offset()) {
713 QUIC_BUG << "Write too many data via stream " << id_;
renjietang87df0d02020-02-13 11:53:52 -0800714 OnUnrecoverableError(
QUICHE teama6ef0a62019-03-07 20:34:33 -0500715 QUIC_STREAM_LENGTH_OVERFLOW,
dmcardlecf0bfcf2019-12-13 08:08:21 -0800716 quiche::QuicheStrCat("Write too many data via stream ", id_));
QUICHE teama6ef0a62019-03-07 20:34:33 -0500717 return consumed_data;
718 }
719 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
720 }
721 }
722 fin_buffered_ = consumed_data.fin_consumed;
723
724 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
725 // Write data if there is no buffered data before.
726 WriteBufferedData();
727 }
728
729 return consumed_data;
730}
731
732bool QuicStream::HasPendingRetransmission() const {
733 return send_buffer_.HasPendingRetransmission() || fin_lost_;
734}
735
736bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
737 QuicByteCount data_length,
738 bool fin) const {
739 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
740 (fin && fin_outstanding_);
741}
742
QUICHE teama6ef0a62019-03-07 20:34:33 -0500743void QuicStream::CloseReadSide() {
744 if (read_side_closed_) {
745 return;
746 }
747 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
748
749 read_side_closed_ = true;
750 sequencer_.ReleaseBuffer();
751
752 if (write_side_closed_) {
753 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
754 session_->CloseStream(id());
755 }
756}
757
758void QuicStream::CloseWriteSide() {
759 if (write_side_closed_) {
760 return;
761 }
762 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
763
764 write_side_closed_ = true;
765 if (read_side_closed_) {
766 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
767 session_->CloseStream(id());
768 }
769}
770
771bool QuicStream::HasBufferedData() const {
772 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
773 return send_buffer_.stream_offset() > stream_bytes_written();
774}
775
776QuicTransportVersion QuicStream::transport_version() const {
renjietangd1d00852019-09-06 10:43:12 -0700777 return session_->transport_version();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500778}
779
780HandshakeProtocol QuicStream::handshake_protocol() const {
781 return session_->connection()->version().handshake_protocol;
782}
783
784void QuicStream::StopReading() {
785 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
786 sequencer_.StopReading();
787}
788
QUICHE teama6ef0a62019-03-07 20:34:33 -0500789void QuicStream::OnClose() {
790 CloseReadSide();
791 CloseWriteSide();
792
793 if (!fin_sent_ && !rst_sent_) {
794 // For flow control accounting, tell the peer how many bytes have been
795 // written on this stream before termination. Done here if needed, using a
796 // RST_STREAM frame.
797 QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
798 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
799 stream_bytes_written());
800 session_->OnStreamDoneWaitingForAcks(id_);
801 rst_sent_ = true;
802 }
803
nharperd5c4a932019-05-13 13:58:49 -0700804 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500805 connection_flow_controller_->FlowControlViolation()) {
806 return;
807 }
808 // The stream is being closed and will not process any further incoming bytes.
809 // As there may be more bytes in flight, to ensure that both endpoints have
810 // the same connection level flow control state, mark all unreceived or
811 // buffered bytes as consumed.
812 QuicByteCount bytes_to_consume =
nharperd5c4a932019-05-13 13:58:49 -0700813 flow_controller_->highest_received_byte_offset() -
814 flow_controller_->bytes_consumed();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500815 AddBytesConsumed(bytes_to_consume);
816}
817
818void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
renjietang30246322019-09-09 11:24:10 -0700819 if (type_ == READ_UNIDIRECTIONAL) {
renjietang87df0d02020-02-13 11:53:52 -0800820 OnUnrecoverableError(
renjietang28c04b72019-07-01 15:08:09 -0700821 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
822 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
823 return;
824 }
825
renjietangd088eab2019-11-21 14:54:41 -0800826 if (flow_controller_->UpdateSendWindowOffset(frame.max_data)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500827 // Let session unblock this stream.
828 session_->MarkConnectionLevelWriteBlocked(id_);
829 }
830}
831
832bool QuicStream::MaybeIncreaseHighestReceivedOffset(
833 QuicStreamOffset new_offset) {
834 uint64_t increment =
nharperd5c4a932019-05-13 13:58:49 -0700835 new_offset - flow_controller_->highest_received_byte_offset();
836 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500837 return false;
838 }
839
840 // If |new_offset| increased the stream flow controller's highest received
841 // offset, increase the connection flow controller's value by the incremental
842 // difference.
843 if (stream_contributes_to_connection_flow_control_) {
844 connection_flow_controller_->UpdateHighestReceivedOffset(
845 connection_flow_controller_->highest_received_byte_offset() +
846 increment);
847 }
848 return true;
849}
850
851void QuicStream::AddBytesSent(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700852 flow_controller_->AddBytesSent(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500853 if (stream_contributes_to_connection_flow_control_) {
854 connection_flow_controller_->AddBytesSent(bytes);
855 }
856}
857
858void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700859 if (type_ == CRYPTO) {
860 // A stream with type CRYPTO has no flow control, so there's nothing this
861 // function needs to do. This function still gets called by the
862 // QuicStreamSequencers used by QuicCryptoStream.
863 return;
864 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500865 // Only adjust stream level flow controller if still reading.
866 if (!read_side_closed_) {
nharperd5c4a932019-05-13 13:58:49 -0700867 flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500868 }
869
870 if (stream_contributes_to_connection_flow_control_) {
871 connection_flow_controller_->AddBytesConsumed(bytes);
872 }
873}
874
875void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
nharperd5c4a932019-05-13 13:58:49 -0700876 if (flow_controller_->UpdateSendWindowOffset(new_window)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500877 // Let session unblock this stream.
878 session_->MarkConnectionLevelWriteBlocked(id_);
879 }
880}
881
882void QuicStream::AddRandomPaddingAfterFin() {
883 add_random_padding_after_fin_ = true;
884}
885
886bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
887 QuicByteCount data_length,
888 bool fin_acked,
dschinazi17d42422019-06-18 16:35:07 -0700889 QuicTime::Delta /*ack_delay_time*/,
QUICHE team2f5f30b2020-02-18 08:52:28 -0800890 QuicTime /*receive_timestamp*/,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500891 QuicByteCount* newly_acked_length) {
892 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
893 << "[" << offset << ", " << offset + data_length << "]"
894 << " fin = " << fin_acked;
895 *newly_acked_length = 0;
896 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
897 newly_acked_length)) {
renjietang87df0d02020-02-13 11:53:52 -0800898 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent data.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500899 return false;
900 }
901 if (!fin_sent_ && fin_acked) {
renjietang87df0d02020-02-13 11:53:52 -0800902 OnUnrecoverableError(QUIC_INTERNAL_ERROR, "Trying to ack unsent fin.");
QUICHE teama6ef0a62019-03-07 20:34:33 -0500903 return false;
904 }
905 // Indicates whether ack listener's OnPacketAcked should be called.
906 const bool new_data_acked =
907 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
908 if (fin_acked) {
909 fin_outstanding_ = false;
910 fin_lost_ = false;
911 }
912 if (!IsWaitingForAcks()) {
913 session_->OnStreamDoneWaitingForAcks(id_);
914 }
915 return new_data_acked;
916}
917
918void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
919 QuicByteCount data_length,
920 bool fin_retransmitted) {
921 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
922 if (fin_retransmitted) {
923 fin_lost_ = false;
924 }
925}
926
927void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
928 QuicByteCount data_length,
929 bool fin_lost) {
930 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
931 << "[" << offset << ", " << offset + data_length << "]"
932 << " fin = " << fin_lost;
933 if (data_length > 0) {
934 send_buffer_.OnStreamDataLost(offset, data_length);
935 }
936 if (fin_lost && fin_outstanding_) {
937 fin_lost_ = true;
938 }
939}
940
941bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
942 QuicByteCount data_length,
renjietang4d992bf2020-03-03 13:01:55 -0800943 bool fin,
944 TransmissionType type) {
945 DCHECK(type == PTO_RETRANSMISSION || type == RTO_RETRANSMISSION ||
946 type == TLP_RETRANSMISSION || type == PROBING_RETRANSMISSION);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500947 if (HasDeadlinePassed()) {
948 OnDeadlinePassed();
949 return true;
950 }
951 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
952 offset + data_length);
953 retransmission.Difference(bytes_acked());
954 bool retransmit_fin = fin && fin_outstanding_;
955 if (retransmission.Empty() && !retransmit_fin) {
956 return true;
957 }
958 QuicConsumedData consumed(0, false);
959 for (const auto& interval : retransmission) {
960 QuicStreamOffset retransmission_offset = interval.min();
961 QuicByteCount retransmission_length = interval.max() - interval.min();
962 const bool can_bundle_fin =
963 retransmit_fin && (retransmission_offset + retransmission_length ==
964 stream_bytes_written());
renjietang7c239172020-02-21 13:50:39 -0800965 consumed = stream_delegate_->WritevData(
966 id_, retransmission_length, retransmission_offset,
renjietang4d992bf2020-03-03 13:01:55 -0800967 can_bundle_fin ? FIN : NO_FIN, type, QuicheNullOpt);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500968 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
969 << " is forced to retransmit stream data ["
970 << retransmission_offset << ", "
971 << retransmission_offset + retransmission_length
972 << ") and fin: " << can_bundle_fin
973 << ", consumed: " << consumed;
974 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
975 consumed.fin_consumed);
976 if (can_bundle_fin) {
977 retransmit_fin = !consumed.fin_consumed;
978 }
979 if (consumed.bytes_consumed < retransmission_length ||
980 (can_bundle_fin && !consumed.fin_consumed)) {
981 // Connection is write blocked.
982 return false;
983 }
984 }
985 if (retransmit_fin) {
986 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
987 << " retransmits fin only frame.";
renjietang4d992bf2020-03-03 13:01:55 -0800988 consumed = stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
989 type, QuicheNullOpt);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500990 if (!consumed.fin_consumed) {
991 return false;
992 }
993 }
994 return true;
995}
996
997bool QuicStream::IsWaitingForAcks() const {
998 return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
999 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
1000}
1001
1002size_t QuicStream::ReadableBytes() const {
1003 return sequencer_.ReadableBytes();
1004}
1005
1006bool QuicStream::WriteStreamData(QuicStreamOffset offset,
1007 QuicByteCount data_length,
1008 QuicDataWriter* writer) {
1009 DCHECK_LT(0u, data_length);
1010 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
1011 << offset << " length " << data_length;
1012 return send_buffer_.WriteStreamData(offset, data_length, writer);
1013}
1014
1015void QuicStream::WriteBufferedData() {
1016 DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
1017
1018 if (session_->ShouldYield(id())) {
1019 session_->MarkConnectionLevelWriteBlocked(id());
1020 return;
1021 }
1022
1023 // Size of buffered data.
1024 size_t write_length = BufferedDataBytes();
1025
1026 // A FIN with zero data payload should not be flow control blocked.
1027 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
1028
1029 bool fin = fin_buffered_;
1030
1031 // How much data flow control permits to be written.
nharperd5c4a932019-05-13 13:58:49 -07001032 QuicByteCount send_window = flow_controller_->SendWindowSize();
QUICHE teama6ef0a62019-03-07 20:34:33 -05001033 if (stream_contributes_to_connection_flow_control_) {
1034 send_window =
1035 std::min(send_window, connection_flow_controller_->SendWindowSize());
1036 }
1037
1038 if (send_window == 0 && !fin_with_zero_data) {
1039 // Quick return if nothing can be sent.
1040 MaybeSendBlocked();
1041 return;
1042 }
1043
1044 if (write_length > send_window) {
1045 // Don't send the FIN unless all the data will be sent.
1046 fin = false;
1047
1048 // Writing more data would be a violation of flow control.
1049 write_length = static_cast<size_t>(send_window);
1050 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
1051 << write_length << " due to flow control";
1052 }
renjietang73be08f2020-03-11 15:20:08 -07001053 if (!session_->write_with_transmission()) {
1054 session_->SetTransmissionType(NOT_RETRANSMISSION);
1055 }
fayang97857352019-07-01 06:15:26 -07001056
1057 StreamSendingState state = fin ? FIN : NO_FIN;
1058 if (fin && add_random_padding_after_fin_) {
1059 state = FIN_AND_PADDING;
1060 }
renjietang4d992bf2020-03-03 13:01:55 -08001061 QuicConsumedData consumed_data =
1062 stream_delegate_->WritevData(id(), write_length, stream_bytes_written(),
1063 state, NOT_RETRANSMISSION, QuicheNullOpt);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001064
1065 OnStreamDataConsumed(consumed_data.bytes_consumed);
1066
1067 AddBytesSent(consumed_data.bytes_consumed);
1068 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
1069 << stream_bytes_written() << " bytes "
1070 << " and has buffered data " << BufferedDataBytes() << " bytes."
1071 << " fin is sent: " << consumed_data.fin_consumed
1072 << " fin is buffered: " << fin_buffered_;
1073
1074 // The write may have generated a write error causing this stream to be
1075 // closed. If so, simply return without marking the stream write blocked.
1076 if (write_side_closed_) {
1077 return;
1078 }
1079
1080 if (consumed_data.bytes_consumed == write_length) {
1081 if (!fin_with_zero_data) {
1082 MaybeSendBlocked();
1083 }
1084 if (fin && consumed_data.fin_consumed) {
1085 fin_sent_ = true;
1086 fin_outstanding_ = true;
1087 if (fin_received_) {
1088 session_->StreamDraining(id_);
1089 }
1090 CloseWriteSide();
1091 } else if (fin && !consumed_data.fin_consumed) {
1092 session_->MarkConnectionLevelWriteBlocked(id());
1093 }
1094 } else {
1095 session_->MarkConnectionLevelWriteBlocked(id());
1096 }
1097 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1098 busy_counter_ = 0;
1099 }
zhongyi1b2f7832019-06-14 13:31:34 -07001100
1101 if (IsWaitingForAcks()) {
1102 session_->OnStreamWaitingForAcks(id_);
1103 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001104}
1105
1106uint64_t QuicStream::BufferedDataBytes() const {
1107 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1108 return send_buffer_.stream_offset() - stream_bytes_written();
1109}
1110
1111bool QuicStream::CanWriteNewData() const {
1112 return BufferedDataBytes() < buffered_data_threshold_;
1113}
1114
1115bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1116 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1117}
1118
1119uint64_t QuicStream::stream_bytes_written() const {
1120 return send_buffer_.stream_bytes_written();
1121}
1122
1123const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1124 return send_buffer_.bytes_acked();
1125}
1126
1127void QuicStream::OnStreamDataConsumed(size_t bytes_consumed) {
1128 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1129}
1130
1131void QuicStream::WritePendingRetransmission() {
1132 while (HasPendingRetransmission()) {
1133 QuicConsumedData consumed(0, false);
1134 if (!send_buffer_.HasPendingRetransmission()) {
1135 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1136 << " retransmits fin only frame.";
renjietang4d992bf2020-03-03 13:01:55 -08001137 consumed =
1138 stream_delegate_->WritevData(id_, 0, stream_bytes_written(), FIN,
1139 LOSS_RETRANSMISSION, QuicheNullOpt);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001140 fin_lost_ = !consumed.fin_consumed;
1141 if (fin_lost_) {
1142 // Connection is write blocked.
1143 return;
1144 }
1145 } else {
1146 StreamPendingRetransmission pending =
1147 send_buffer_.NextPendingRetransmission();
1148 // Determine whether the lost fin can be bundled with the data.
1149 const bool can_bundle_fin =
1150 fin_lost_ &&
1151 (pending.offset + pending.length == stream_bytes_written());
renjietang7c239172020-02-21 13:50:39 -08001152 consumed = stream_delegate_->WritevData(
1153 id_, pending.length, pending.offset, can_bundle_fin ? FIN : NO_FIN,
renjietang4d992bf2020-03-03 13:01:55 -08001154 LOSS_RETRANSMISSION, QuicheNullOpt);
QUICHE teama6ef0a62019-03-07 20:34:33 -05001155 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1156 << " tries to retransmit stream data [" << pending.offset
1157 << ", " << pending.offset + pending.length
1158 << ") and fin: " << can_bundle_fin
1159 << ", consumed: " << consumed;
1160 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1161 consumed.fin_consumed);
1162 if (consumed.bytes_consumed < pending.length ||
1163 (can_bundle_fin && !consumed.fin_consumed)) {
1164 // Connection is write blocked.
1165 return;
1166 }
1167 }
1168 }
1169}
1170
1171bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1172 if (is_static_) {
1173 QUIC_BUG << "Cannot set TTL of a static stream.";
1174 return false;
1175 }
1176 if (deadline_.IsInitialized()) {
1177 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1178 return false;
1179 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001180 QuicTime now = session()->connection()->clock()->ApproximateNow();
1181 deadline_ = now + ttl;
1182 return true;
1183}
1184
1185bool QuicStream::HasDeadlinePassed() const {
1186 if (!deadline_.IsInitialized()) {
1187 // No deadline has been set.
1188 return false;
1189 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001190 QuicTime now = session()->connection()->clock()->ApproximateNow();
1191 if (now < deadline_) {
1192 return false;
1193 }
1194 // TTL expired.
1195 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1196 return true;
1197}
1198
1199void QuicStream::OnDeadlinePassed() {
1200 Reset(QUIC_STREAM_TTL_EXPIRED);
1201}
1202
1203void QuicStream::SendStopSending(uint16_t code) {
fkastenholz305e1732019-06-18 05:01:22 -07001204 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -05001205 // If the connection is not version 99, do nothing.
1206 // Do not QUIC_BUG or anything; the application really does not need to know
1207 // what version the connection is in.
1208 return;
1209 }
1210 session_->SendStopSending(code, id_);
1211}
1212
bnc5f202512020-02-01 18:43:02 -08001213// static
1214spdy::SpdyStreamPrecedence QuicStream::CalculateDefaultPriority(
1215 const QuicSession* session) {
1216 if (VersionUsesHttp3(session->transport_version())) {
1217 return spdy::SpdyStreamPrecedence(QuicStream::kDefaultUrgency);
1218 }
1219
1220 if (session->use_http2_priority_write_scheduler()) {
1221 return spdy::SpdyStreamPrecedence(0, spdy::kHttp2DefaultStreamWeight,
1222 false);
1223 }
1224
1225 return spdy::SpdyStreamPrecedence(QuicStream::kDefaultPriority);
1226}
1227
QUICHE teama6ef0a62019-03-07 20:34:33 -05001228} // namespace quic