blob: 32b168d46b5c29da561aebdb4aa8a602782968a5 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream.h"
6
vasilvv872e7a32019-03-12 16:42:44 -07007#include <string>
8
QUICHE teama6ef0a62019-03-07 20:34:33 -05009#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
10#include "net/third_party/quiche/src/quic/core/quic_session.h"
11#include "net/third_party/quiche/src/quic/core/quic_utils.h"
12#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
13#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050017
18using spdy::SpdyPriority;
19
20namespace quic {
21
22#define ENDPOINT \
23 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
24
25namespace {
26
27size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
28 return session->config()->GetInitialStreamFlowControlWindowToSend();
29}
30
31size_t GetReceivedFlowControlWindow(QuicSession* session) {
32 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
33 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
34 }
35
dschinazic7036122019-04-30 12:46:34 -070036 return kDefaultFlowControlSendWindow;
QUICHE teama6ef0a62019-03-07 20:34:33 -050037}
38
39} // namespace
40
41// static
42const SpdyPriority QuicStream::kDefaultPriority;
43
44PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
45 : id_(id),
46 session_(session),
47 stream_bytes_read_(0),
48 fin_received_(false),
49 connection_flow_controller_(session->flow_controller()),
50 flow_controller_(session,
51 id,
52 /*is_connection_flow_controller*/ false,
53 GetReceivedFlowControlWindow(session),
54 GetInitialStreamFlowControlWindowToSend(session),
55 kStreamReceiveWindowLimit,
56 session_->flow_controller()->auto_tune_receive_window(),
57 session_->flow_controller()),
58 sequencer_(this) {}
59
60void PendingStream::OnDataAvailable() {
renjietang0c558862019-05-08 13:26:23 -070061 // It will be called when pending stream receives its first byte. But this
62 // call should simply be ignored so that data remains in sequencer.
QUICHE teama6ef0a62019-03-07 20:34:33 -050063}
64
65void PendingStream::OnFinRead() {
66 QUIC_BUG << "OnFinRead should not be called.";
67 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected fin read");
68}
69
70void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
71 QUIC_BUG << "AddBytesConsumed should not be called.";
72 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected bytes consumed");
73}
74
75void PendingStream::Reset(QuicRstStreamErrorCode error) {
76 session_->SendRstStream(id_, error, 0);
77}
78
79void PendingStream::CloseConnectionWithDetails(QuicErrorCode error,
vasilvvc48c8712019-03-11 13:38:16 -070080 const std::string& details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -050081 session_->connection()->CloseConnection(
82 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
83}
84
85QuicStreamId PendingStream::id() const {
86 return id_;
87}
88
89const QuicSocketAddress& PendingStream::PeerAddressOfLatestPacket() const {
90 return session_->connection()->last_packet_source_address();
91}
92
93void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
94 DCHECK_EQ(frame.stream_id, id_);
QUICHE teama6ef0a62019-03-07 20:34:33 -050095
96 bool is_stream_too_long =
97 (frame.offset > kMaxStreamLength) ||
98 (kMaxStreamLength - frame.offset < frame.data_length);
99 if (is_stream_too_long) {
100 // Close connection if stream becomes too long.
101 QUIC_PEER_BUG
102 << "Receive stream frame reaches max stream length. frame offset "
103 << frame.offset << " length " << frame.data_length;
104 CloseConnectionWithDetails(
105 QUIC_STREAM_LENGTH_OVERFLOW,
106 "Peer sends more data than allowed on this stream.");
107 return;
108 }
109
110 if (frame.fin) {
111 fin_received_ = true;
112 }
113
114 // This count includes duplicate data received.
115 size_t frame_payload_size = frame.data_length;
116 stream_bytes_read_ += frame_payload_size;
117
118 // Flow control is interested in tracking highest received offset.
119 // Only interested in received frames that carry data.
120 if (frame_payload_size > 0 &&
121 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
122 // As the highest received offset has changed, check to see if this is a
123 // violation of flow control.
124 if (flow_controller_.FlowControlViolation() ||
125 connection_flow_controller_->FlowControlViolation()) {
126 CloseConnectionWithDetails(
127 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
128 "Flow control violation after increasing offset");
129 return;
130 }
131 }
132
133 sequencer_.OnStreamFrame(frame);
134}
135
136void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
137 DCHECK_EQ(frame.stream_id, id_);
138
139 if (frame.byte_offset > kMaxStreamLength) {
140 // Peer are not suppose to write bytes more than maxium allowed.
141 CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
142 "Reset frame stream offset overflow.");
143 return;
144 }
145 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
146 if (flow_controller_.FlowControlViolation() ||
147 connection_flow_controller_->FlowControlViolation()) {
148 CloseConnectionWithDetails(
149 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
150 "Flow control violation after increasing offset");
151 return;
152 }
153}
154
155bool PendingStream::MaybeIncreaseHighestReceivedOffset(
156 QuicStreamOffset new_offset) {
157 uint64_t increment =
158 new_offset - flow_controller_.highest_received_byte_offset();
159 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
160 return false;
161 }
162
163 // If |new_offset| increased the stream flow controller's highest received
164 // offset, increase the connection flow controller's value by the incremental
165 // difference.
166 connection_flow_controller_->UpdateHighestReceivedOffset(
167 connection_flow_controller_->highest_received_byte_offset() + increment);
168 return true;
169}
170
renjietang35448992019-05-08 17:08:57 -0700171QuicStream::QuicStream(PendingStream pending, StreamType type, bool is_static)
QUICHE teama6ef0a62019-03-07 20:34:33 -0500172 : QuicStream(pending.id_,
173 pending.session_,
174 std::move(pending.sequencer_),
renjietang35448992019-05-08 17:08:57 -0700175 is_static,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500176 type,
177 pending.stream_bytes_read_,
178 pending.fin_received_,
179 std::move(pending.flow_controller_),
180 pending.connection_flow_controller_) {
181 sequencer_.set_stream(this);
182}
183
nharperd5c4a932019-05-13 13:58:49 -0700184namespace {
185
186QuicOptional<QuicFlowController> FlowController(QuicStreamId id,
187 QuicSession* session,
188 StreamType type) {
189 if (type == CRYPTO) {
190 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
191 // it is using crypto frames instead of stream frames. The QuicCryptoStream
192 // doesn't have any flow control in that case, so we don't create a
193 // QuicFlowController for it.
194 return QuicOptional<QuicFlowController>();
195 }
196 return QuicFlowController(
197 session, id,
198 /*is_connection_flow_controller*/ false,
199 GetReceivedFlowControlWindow(session),
200 GetInitialStreamFlowControlWindowToSend(session),
201 kStreamReceiveWindowLimit,
202 session->flow_controller()->auto_tune_receive_window(),
203 session->flow_controller());
204}
205
206} // namespace
207
QUICHE teama6ef0a62019-03-07 20:34:33 -0500208QuicStream::QuicStream(QuicStreamId id,
209 QuicSession* session,
210 bool is_static,
211 StreamType type)
212 : QuicStream(id,
213 session,
214 QuicStreamSequencer(this),
215 is_static,
216 type,
217 0,
218 false,
nharperd5c4a932019-05-13 13:58:49 -0700219 FlowController(id, session, type),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500220 session->flow_controller()) {}
221
222QuicStream::QuicStream(QuicStreamId id,
223 QuicSession* session,
224 QuicStreamSequencer sequencer,
225 bool is_static,
226 StreamType type,
227 uint64_t stream_bytes_read,
228 bool fin_received,
nharperd5c4a932019-05-13 13:58:49 -0700229 QuicOptional<QuicFlowController> flow_controller,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500230 QuicFlowController* connection_flow_controller)
231 : sequencer_(std::move(sequencer)),
232 id_(id),
233 session_(session),
234 priority_(kDefaultPriority),
235 stream_bytes_read_(stream_bytes_read),
236 stream_error_(QUIC_STREAM_NO_ERROR),
237 connection_error_(QUIC_NO_ERROR),
238 read_side_closed_(false),
239 write_side_closed_(false),
240 fin_buffered_(false),
241 fin_sent_(false),
242 fin_outstanding_(false),
243 fin_lost_(false),
244 fin_received_(fin_received),
245 rst_sent_(false),
246 rst_received_(false),
247 perspective_(session_->perspective()),
248 flow_controller_(std::move(flow_controller)),
249 connection_flow_controller_(connection_flow_controller),
250 stream_contributes_to_connection_flow_control_(true),
251 busy_counter_(0),
252 add_random_padding_after_fin_(false),
253 send_buffer_(
254 session->connection()->helper()->GetStreamSendBufferAllocator()),
255 buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
256 is_static_(is_static),
257 deadline_(QuicTime::Zero()),
nharperd5c4a932019-05-13 13:58:49 -0700258 type_(session->connection()->transport_version() == QUIC_VERSION_99 &&
259 type != CRYPTO
QUICHE teama6ef0a62019-03-07 20:34:33 -0500260 ? QuicUtils::GetStreamType(id_,
261 perspective_,
262 session->IsIncomingStream(id_))
263 : type) {
264 if (type_ == WRITE_UNIDIRECTIONAL) {
265 set_fin_received(true);
266 CloseReadSide();
267 } else if (type_ == READ_UNIDIRECTIONAL) {
268 set_fin_sent(true);
269 CloseWriteSide();
270 }
271 SetFromConfig();
nharperd5c4a932019-05-13 13:58:49 -0700272 if (type_ != CRYPTO) {
273 session_->RegisterStreamPriority(id, is_static_, priority_);
274 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500275}
276
277QuicStream::~QuicStream() {
278 if (session_ != nullptr && IsWaitingForAcks()) {
279 QUIC_DVLOG(1)
280 << ENDPOINT << "Stream " << id_
281 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
282 << send_buffer_.stream_bytes_outstanding()
283 << ", fin_outstanding: " << fin_outstanding_;
284 }
nharperd5c4a932019-05-13 13:58:49 -0700285 if (session_ != nullptr && type_ != CRYPTO) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500286 session_->UnregisterStreamPriority(id(), is_static_);
287 }
288}
289
290void QuicStream::SetFromConfig() {}
291
292void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
293 DCHECK_EQ(frame.stream_id, id_);
294
295 DCHECK(!(read_side_closed_ && write_side_closed_));
296
297 if (type_ == WRITE_UNIDIRECTIONAL) {
298 CloseConnectionWithDetails(
299 QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
300 "Data received on write unidirectional stream");
301 return;
302 }
303
304 bool is_stream_too_long =
305 (frame.offset > kMaxStreamLength) ||
306 (kMaxStreamLength - frame.offset < frame.data_length);
307 if (is_stream_too_long) {
308 // Close connection if stream becomes too long.
309 QUIC_PEER_BUG << "Receive stream frame on stream " << id_
310 << " reaches max stream length. frame offset " << frame.offset
311 << " length " << frame.data_length << ". "
312 << sequencer_.DebugString();
313 CloseConnectionWithDetails(
314 QUIC_STREAM_LENGTH_OVERFLOW,
315 QuicStrCat("Peer sends more data than allowed on stream ", id_,
316 ". frame: offset = ", frame.offset, ", length = ",
317 frame.data_length, ". ", sequencer_.DebugString()));
318 return;
319 }
320 if (frame.fin) {
321 fin_received_ = true;
322 if (fin_sent_) {
323 session_->StreamDraining(id_);
324 }
325 }
326
327 if (read_side_closed_) {
328 QUIC_DLOG(INFO)
329 << ENDPOINT << "Stream " << frame.stream_id
330 << " is closed for reading. Ignoring newly received stream data.";
331 // The subclass does not want to read data: blackhole the data.
332 return;
333 }
334
335 // This count includes duplicate data received.
336 size_t frame_payload_size = frame.data_length;
337 stream_bytes_read_ += frame_payload_size;
338
339 // Flow control is interested in tracking highest received offset.
340 // Only interested in received frames that carry data.
341 if (frame_payload_size > 0 &&
342 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
343 // As the highest received offset has changed, check to see if this is a
344 // violation of flow control.
nharperd5c4a932019-05-13 13:58:49 -0700345 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500346 connection_flow_controller_->FlowControlViolation()) {
347 CloseConnectionWithDetails(
348 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
349 "Flow control violation after increasing offset");
350 return;
351 }
352 }
353
354 sequencer_.OnStreamFrame(frame);
355}
356
357int QuicStream::num_frames_received() const {
358 return sequencer_.num_frames_received();
359}
360
361int QuicStream::num_duplicate_frames_received() const {
362 return sequencer_.num_duplicate_frames_received();
363}
364
365void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
366 rst_received_ = true;
367 if (frame.byte_offset > kMaxStreamLength) {
368 // Peer are not suppose to write bytes more than maxium allowed.
369 CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
370 "Reset frame stream offset overflow.");
371 return;
372 }
373 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
nharperd5c4a932019-05-13 13:58:49 -0700374 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500375 connection_flow_controller_->FlowControlViolation()) {
376 CloseConnectionWithDetails(
377 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
378 "Flow control violation after increasing offset");
379 return;
380 }
381
382 stream_error_ = frame.error_code;
383 // Google QUIC closes both sides of the stream in response to a
384 // RESET_STREAM, IETF QUIC closes only the read side.
385 if (transport_version() != QUIC_VERSION_99) {
386 CloseWriteSide();
387 }
388 CloseReadSide();
389}
390
391void QuicStream::OnConnectionClosed(QuicErrorCode error,
392 ConnectionCloseSource /*source*/) {
393 if (read_side_closed_ && write_side_closed_) {
394 return;
395 }
396 if (error != QUIC_NO_ERROR) {
397 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
398 connection_error_ = error;
399 }
400
401 CloseWriteSide();
402 CloseReadSide();
403}
404
405void QuicStream::OnFinRead() {
406 DCHECK(sequencer_.IsClosed());
407 // OnFinRead can be called due to a FIN flag in a headers block, so there may
408 // have been no OnStreamFrame call with a FIN in the frame.
409 fin_received_ = true;
410 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
411 // stream will be destroyed by CloseReadSide, so don't need to call
412 // StreamDraining.
413 CloseReadSide();
414}
415
416void QuicStream::Reset(QuicRstStreamErrorCode error) {
417 stream_error_ = error;
418 // Sending a RstStream results in calling CloseStream.
419 session()->SendRstStream(id(), error, stream_bytes_written());
420 rst_sent_ = true;
421}
422
423void QuicStream::CloseConnectionWithDetails(QuicErrorCode error,
vasilvvc48c8712019-03-11 13:38:16 -0700424 const std::string& details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500425 session()->connection()->CloseConnection(
426 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
427}
428
429SpdyPriority QuicStream::priority() const {
430 return priority_;
431}
432
433void QuicStream::SetPriority(SpdyPriority priority) {
434 priority_ = priority;
435 session_->UpdateStreamPriority(id(), priority);
436}
437
438void QuicStream::WriteOrBufferData(
439 QuicStringPiece data,
440 bool fin,
441 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
442 if (data.empty() && !fin) {
443 QUIC_BUG << "data.empty() && !fin";
444 return;
445 }
446
447 if (fin_buffered_) {
448 QUIC_BUG << "Fin already buffered";
449 return;
450 }
451 if (write_side_closed_) {
452 QUIC_DLOG(ERROR) << ENDPOINT
453 << "Attempt to write when the write side is closed";
454 if (type_ == READ_UNIDIRECTIONAL) {
455 CloseConnectionWithDetails(
456 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
457 "Try to send data on read unidirectional stream");
458 }
459 return;
460 }
461
QUICHE teama6ef0a62019-03-07 20:34:33 -0500462 fin_buffered_ = fin;
463
464 bool had_buffered_data = HasBufferedData();
465 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
466 // all data to be consumed.
467 if (data.length() > 0) {
468 struct iovec iov(QuicUtils::MakeIovec(data));
469 QuicStreamOffset offset = send_buffer_.stream_offset();
470 if (kMaxStreamLength - offset < data.length()) {
471 QUIC_BUG << "Write too many data via stream " << id_;
472 CloseConnectionWithDetails(
473 QUIC_STREAM_LENGTH_OVERFLOW,
474 QuicStrCat("Write too many data via stream ", id_));
475 return;
476 }
477 send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
478 OnDataBuffered(offset, data.length(), ack_listener);
479 }
480 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
481 // Write data if there is no buffered data before.
482 WriteBufferedData();
483 }
484}
485
486void QuicStream::OnCanWrite() {
487 if (HasDeadlinePassed()) {
488 OnDeadlinePassed();
489 return;
490 }
491 if (HasPendingRetransmission()) {
492 WritePendingRetransmission();
493 // Exit early to allow other streams to write pending retransmissions if
494 // any.
495 return;
496 }
497
498 if (write_side_closed_) {
499 QUIC_DLOG(ERROR)
500 << ENDPOINT << "Stream " << id()
501 << " attempting to write new data when the write side is closed";
502 return;
503 }
504 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
505 WriteBufferedData();
506 }
507 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
508 // Notify upper layer to write new data when buffered data size is below
509 // low water mark.
510 OnCanWriteNewData();
511 }
512}
513
514void QuicStream::MaybeSendBlocked() {
nharperd5c4a932019-05-13 13:58:49 -0700515 if (flow_controller_->ShouldSendBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500516 session_->SendBlocked(id_);
517 }
518 if (!stream_contributes_to_connection_flow_control_) {
519 return;
520 }
521 if (connection_flow_controller_->ShouldSendBlocked()) {
522 session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
523 }
524 // If the stream is blocked by connection-level flow control but not by
525 // stream-level flow control, add the stream to the write blocked list so that
526 // the stream will be given a chance to write when a connection-level
527 // WINDOW_UPDATE arrives.
528 if (connection_flow_controller_->IsBlocked() &&
nharperd5c4a932019-05-13 13:58:49 -0700529 !flow_controller_->IsBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500530 session_->MarkConnectionLevelWriteBlocked(id());
531 }
532}
533
534QuicConsumedData QuicStream::WritevData(const struct iovec* iov,
535 int iov_count,
536 bool fin) {
537 if (write_side_closed_) {
538 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
539 << "attempting to write when the write side is closed";
540 if (type_ == READ_UNIDIRECTIONAL) {
541 CloseConnectionWithDetails(
542 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
543 "Try to send data on read unidirectional stream");
544 }
545 return QuicConsumedData(0, false);
546 }
547
548 // How much data was provided.
549 size_t write_length = 0;
550 if (iov != nullptr) {
551 for (int i = 0; i < iov_count; ++i) {
552 write_length += iov[i].iov_len;
553 }
554 }
555
556 QuicConsumedData consumed_data(0, false);
557 if (fin_buffered_) {
558 QUIC_BUG << "Fin already buffered";
559 return consumed_data;
560 }
561
562 if (kMaxStreamLength - send_buffer_.stream_offset() < write_length) {
563 QUIC_BUG << "Write too many data via stream " << id_;
564 CloseConnectionWithDetails(
565 QUIC_STREAM_LENGTH_OVERFLOW,
566 QuicStrCat("Write too many data via stream ", id_));
567 return consumed_data;
568 }
569
570 bool had_buffered_data = HasBufferedData();
571 if (CanWriteNewData()) {
572 // Save all data if buffered data size is below low water mark.
573 consumed_data.bytes_consumed = write_length;
574 if (consumed_data.bytes_consumed > 0) {
575 QuicStreamOffset offset = send_buffer_.stream_offset();
576 send_buffer_.SaveStreamData(iov, iov_count, 0, write_length);
577 OnDataBuffered(offset, write_length, nullptr);
578 }
579 }
580 consumed_data.fin_consumed =
581 consumed_data.bytes_consumed == write_length && fin;
582 fin_buffered_ = consumed_data.fin_consumed;
583
584 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
585 // Write data if there is no buffered data before.
586 WriteBufferedData();
587 }
588
589 return consumed_data;
590}
591
592QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
593 QuicConsumedData consumed_data(0, false);
594 if (span.empty() && !fin) {
595 QUIC_BUG << "span.empty() && !fin";
596 return consumed_data;
597 }
598
599 if (fin_buffered_) {
600 QUIC_BUG << "Fin already buffered";
601 return consumed_data;
602 }
603
604 if (write_side_closed_) {
605 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
606 << "attempting to write when the write side is closed";
607 if (type_ == READ_UNIDIRECTIONAL) {
608 CloseConnectionWithDetails(
609 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
610 "Try to send data on read unidirectional stream");
611 }
612 return consumed_data;
613 }
614
615 bool had_buffered_data = HasBufferedData();
616 if (CanWriteNewData() || span.empty()) {
617 consumed_data.fin_consumed = fin;
618 if (!span.empty()) {
619 // Buffer all data if buffered data size is below limit.
620 QuicStreamOffset offset = send_buffer_.stream_offset();
wub553a9662019-03-28 20:13:23 -0700621 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500622 if (offset > send_buffer_.stream_offset() ||
623 kMaxStreamLength < send_buffer_.stream_offset()) {
624 QUIC_BUG << "Write too many data via stream " << id_;
625 CloseConnectionWithDetails(
626 QUIC_STREAM_LENGTH_OVERFLOW,
627 QuicStrCat("Write too many data via stream ", id_));
628 return consumed_data;
629 }
630 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
631 }
632 }
633 fin_buffered_ = consumed_data.fin_consumed;
634
635 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
636 // Write data if there is no buffered data before.
637 WriteBufferedData();
638 }
639
640 return consumed_data;
641}
642
643bool QuicStream::HasPendingRetransmission() const {
644 return send_buffer_.HasPendingRetransmission() || fin_lost_;
645}
646
647bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
648 QuicByteCount data_length,
649 bool fin) const {
650 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
651 (fin && fin_outstanding_);
652}
653
654QuicConsumedData QuicStream::WritevDataInner(size_t write_length,
655 QuicStreamOffset offset,
656 bool fin) {
657 StreamSendingState state = fin ? FIN : NO_FIN;
658 if (fin && add_random_padding_after_fin_) {
659 state = FIN_AND_PADDING;
660 }
661 return session()->WritevData(this, id(), write_length, offset, state);
662}
663
664void QuicStream::CloseReadSide() {
665 if (read_side_closed_) {
666 return;
667 }
668 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
669
670 read_side_closed_ = true;
671 sequencer_.ReleaseBuffer();
672
673 if (write_side_closed_) {
674 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
675 session_->CloseStream(id());
676 }
677}
678
679void QuicStream::CloseWriteSide() {
680 if (write_side_closed_) {
681 return;
682 }
683 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
684
685 write_side_closed_ = true;
686 if (read_side_closed_) {
687 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
688 session_->CloseStream(id());
689 }
690}
691
692bool QuicStream::HasBufferedData() const {
693 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
694 return send_buffer_.stream_offset() > stream_bytes_written();
695}
696
697QuicTransportVersion QuicStream::transport_version() const {
698 return session_->connection()->transport_version();
699}
700
701HandshakeProtocol QuicStream::handshake_protocol() const {
702 return session_->connection()->version().handshake_protocol;
703}
704
705void QuicStream::StopReading() {
706 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
707 sequencer_.StopReading();
708}
709
710const QuicSocketAddress& QuicStream::PeerAddressOfLatestPacket() const {
711 return session_->connection()->last_packet_source_address();
712}
713
714void QuicStream::OnClose() {
715 CloseReadSide();
716 CloseWriteSide();
717
718 if (!fin_sent_ && !rst_sent_) {
719 // For flow control accounting, tell the peer how many bytes have been
720 // written on this stream before termination. Done here if needed, using a
721 // RST_STREAM frame.
722 QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
723 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
724 stream_bytes_written());
725 session_->OnStreamDoneWaitingForAcks(id_);
726 rst_sent_ = true;
727 }
728
nharperd5c4a932019-05-13 13:58:49 -0700729 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500730 connection_flow_controller_->FlowControlViolation()) {
731 return;
732 }
733 // The stream is being closed and will not process any further incoming bytes.
734 // As there may be more bytes in flight, to ensure that both endpoints have
735 // the same connection level flow control state, mark all unreceived or
736 // buffered bytes as consumed.
737 QuicByteCount bytes_to_consume =
nharperd5c4a932019-05-13 13:58:49 -0700738 flow_controller_->highest_received_byte_offset() -
739 flow_controller_->bytes_consumed();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500740 AddBytesConsumed(bytes_to_consume);
741}
742
743void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
nharperd5c4a932019-05-13 13:58:49 -0700744 if (flow_controller_->UpdateSendWindowOffset(frame.byte_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500745 // Let session unblock this stream.
746 session_->MarkConnectionLevelWriteBlocked(id_);
747 }
748}
749
750bool QuicStream::MaybeIncreaseHighestReceivedOffset(
751 QuicStreamOffset new_offset) {
752 uint64_t increment =
nharperd5c4a932019-05-13 13:58:49 -0700753 new_offset - flow_controller_->highest_received_byte_offset();
754 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500755 return false;
756 }
757
758 // If |new_offset| increased the stream flow controller's highest received
759 // offset, increase the connection flow controller's value by the incremental
760 // difference.
761 if (stream_contributes_to_connection_flow_control_) {
762 connection_flow_controller_->UpdateHighestReceivedOffset(
763 connection_flow_controller_->highest_received_byte_offset() +
764 increment);
765 }
766 return true;
767}
768
769void QuicStream::AddBytesSent(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700770 flow_controller_->AddBytesSent(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500771 if (stream_contributes_to_connection_flow_control_) {
772 connection_flow_controller_->AddBytesSent(bytes);
773 }
774}
775
776void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700777 if (type_ == CRYPTO) {
778 // A stream with type CRYPTO has no flow control, so there's nothing this
779 // function needs to do. This function still gets called by the
780 // QuicStreamSequencers used by QuicCryptoStream.
781 return;
782 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500783 // Only adjust stream level flow controller if still reading.
784 if (!read_side_closed_) {
nharperd5c4a932019-05-13 13:58:49 -0700785 flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500786 }
787
788 if (stream_contributes_to_connection_flow_control_) {
789 connection_flow_controller_->AddBytesConsumed(bytes);
790 }
791}
792
793void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
nharperd5c4a932019-05-13 13:58:49 -0700794 if (flow_controller_->UpdateSendWindowOffset(new_window)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500795 // Let session unblock this stream.
796 session_->MarkConnectionLevelWriteBlocked(id_);
797 }
798}
799
800void QuicStream::AddRandomPaddingAfterFin() {
801 add_random_padding_after_fin_ = true;
802}
803
804bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
805 QuicByteCount data_length,
806 bool fin_acked,
807 QuicTime::Delta ack_delay_time,
808 QuicByteCount* newly_acked_length) {
809 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
810 << "[" << offset << ", " << offset + data_length << "]"
811 << " fin = " << fin_acked;
812 *newly_acked_length = 0;
813 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
814 newly_acked_length)) {
815 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
816 "Trying to ack unsent data.");
817 return false;
818 }
819 if (!fin_sent_ && fin_acked) {
820 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
821 "Trying to ack unsent fin.");
822 return false;
823 }
824 // Indicates whether ack listener's OnPacketAcked should be called.
825 const bool new_data_acked =
826 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
827 if (fin_acked) {
828 fin_outstanding_ = false;
829 fin_lost_ = false;
830 }
831 if (!IsWaitingForAcks()) {
832 session_->OnStreamDoneWaitingForAcks(id_);
833 }
834 return new_data_acked;
835}
836
837void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
838 QuicByteCount data_length,
839 bool fin_retransmitted) {
840 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
841 if (fin_retransmitted) {
842 fin_lost_ = false;
843 }
844}
845
846void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
847 QuicByteCount data_length,
848 bool fin_lost) {
849 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
850 << "[" << offset << ", " << offset + data_length << "]"
851 << " fin = " << fin_lost;
852 if (data_length > 0) {
853 send_buffer_.OnStreamDataLost(offset, data_length);
854 }
855 if (fin_lost && fin_outstanding_) {
856 fin_lost_ = true;
857 }
858}
859
860bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
861 QuicByteCount data_length,
862 bool fin) {
863 if (HasDeadlinePassed()) {
864 OnDeadlinePassed();
865 return true;
866 }
867 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
868 offset + data_length);
869 retransmission.Difference(bytes_acked());
870 bool retransmit_fin = fin && fin_outstanding_;
871 if (retransmission.Empty() && !retransmit_fin) {
872 return true;
873 }
874 QuicConsumedData consumed(0, false);
875 for (const auto& interval : retransmission) {
876 QuicStreamOffset retransmission_offset = interval.min();
877 QuicByteCount retransmission_length = interval.max() - interval.min();
878 const bool can_bundle_fin =
879 retransmit_fin && (retransmission_offset + retransmission_length ==
880 stream_bytes_written());
881 consumed = session()->WritevData(this, id_, retransmission_length,
882 retransmission_offset,
883 can_bundle_fin ? FIN : NO_FIN);
884 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
885 << " is forced to retransmit stream data ["
886 << retransmission_offset << ", "
887 << retransmission_offset + retransmission_length
888 << ") and fin: " << can_bundle_fin
889 << ", consumed: " << consumed;
890 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
891 consumed.fin_consumed);
892 if (can_bundle_fin) {
893 retransmit_fin = !consumed.fin_consumed;
894 }
895 if (consumed.bytes_consumed < retransmission_length ||
896 (can_bundle_fin && !consumed.fin_consumed)) {
897 // Connection is write blocked.
898 return false;
899 }
900 }
901 if (retransmit_fin) {
902 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
903 << " retransmits fin only frame.";
904 consumed = session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
905 if (!consumed.fin_consumed) {
906 return false;
907 }
908 }
909 return true;
910}
911
912bool QuicStream::IsWaitingForAcks() const {
913 return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
914 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
915}
916
917size_t QuicStream::ReadableBytes() const {
918 return sequencer_.ReadableBytes();
919}
920
921bool QuicStream::WriteStreamData(QuicStreamOffset offset,
922 QuicByteCount data_length,
923 QuicDataWriter* writer) {
924 DCHECK_LT(0u, data_length);
925 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
926 << offset << " length " << data_length;
927 return send_buffer_.WriteStreamData(offset, data_length, writer);
928}
929
930void QuicStream::WriteBufferedData() {
931 DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
932
933 if (session_->ShouldYield(id())) {
934 session_->MarkConnectionLevelWriteBlocked(id());
935 return;
936 }
937
938 // Size of buffered data.
939 size_t write_length = BufferedDataBytes();
940
941 // A FIN with zero data payload should not be flow control blocked.
942 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
943
944 bool fin = fin_buffered_;
945
946 // How much data flow control permits to be written.
nharperd5c4a932019-05-13 13:58:49 -0700947 QuicByteCount send_window = flow_controller_->SendWindowSize();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500948 if (stream_contributes_to_connection_flow_control_) {
949 send_window =
950 std::min(send_window, connection_flow_controller_->SendWindowSize());
951 }
952
953 if (send_window == 0 && !fin_with_zero_data) {
954 // Quick return if nothing can be sent.
955 MaybeSendBlocked();
956 return;
957 }
958
959 if (write_length > send_window) {
960 // Don't send the FIN unless all the data will be sent.
961 fin = false;
962
963 // Writing more data would be a violation of flow control.
964 write_length = static_cast<size_t>(send_window);
965 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
966 << write_length << " due to flow control";
967 }
968 if (session_->session_decides_what_to_write()) {
969 session_->SetTransmissionType(NOT_RETRANSMISSION);
970 }
971 QuicConsumedData consumed_data =
972 WritevDataInner(write_length, stream_bytes_written(), fin);
973
974 OnStreamDataConsumed(consumed_data.bytes_consumed);
975
976 AddBytesSent(consumed_data.bytes_consumed);
977 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
978 << stream_bytes_written() << " bytes "
979 << " and has buffered data " << BufferedDataBytes() << " bytes."
980 << " fin is sent: " << consumed_data.fin_consumed
981 << " fin is buffered: " << fin_buffered_;
982
983 // The write may have generated a write error causing this stream to be
984 // closed. If so, simply return without marking the stream write blocked.
985 if (write_side_closed_) {
986 return;
987 }
988
989 if (consumed_data.bytes_consumed == write_length) {
990 if (!fin_with_zero_data) {
991 MaybeSendBlocked();
992 }
993 if (fin && consumed_data.fin_consumed) {
994 fin_sent_ = true;
995 fin_outstanding_ = true;
996 if (fin_received_) {
997 session_->StreamDraining(id_);
998 }
999 CloseWriteSide();
1000 } else if (fin && !consumed_data.fin_consumed) {
1001 session_->MarkConnectionLevelWriteBlocked(id());
1002 }
1003 } else {
1004 session_->MarkConnectionLevelWriteBlocked(id());
1005 }
1006 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1007 busy_counter_ = 0;
1008 }
1009}
1010
1011uint64_t QuicStream::BufferedDataBytes() const {
1012 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1013 return send_buffer_.stream_offset() - stream_bytes_written();
1014}
1015
1016bool QuicStream::CanWriteNewData() const {
1017 return BufferedDataBytes() < buffered_data_threshold_;
1018}
1019
1020bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1021 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1022}
1023
1024uint64_t QuicStream::stream_bytes_written() const {
1025 return send_buffer_.stream_bytes_written();
1026}
1027
1028const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1029 return send_buffer_.bytes_acked();
1030}
1031
1032void QuicStream::OnStreamDataConsumed(size_t bytes_consumed) {
1033 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1034}
1035
1036void QuicStream::WritePendingRetransmission() {
1037 while (HasPendingRetransmission()) {
1038 QuicConsumedData consumed(0, false);
1039 if (!send_buffer_.HasPendingRetransmission()) {
1040 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1041 << " retransmits fin only frame.";
1042 consumed =
1043 session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
1044 fin_lost_ = !consumed.fin_consumed;
1045 if (fin_lost_) {
1046 // Connection is write blocked.
1047 return;
1048 }
1049 } else {
1050 StreamPendingRetransmission pending =
1051 send_buffer_.NextPendingRetransmission();
1052 // Determine whether the lost fin can be bundled with the data.
1053 const bool can_bundle_fin =
1054 fin_lost_ &&
1055 (pending.offset + pending.length == stream_bytes_written());
1056 consumed =
1057 session()->WritevData(this, id_, pending.length, pending.offset,
1058 can_bundle_fin ? FIN : NO_FIN);
1059 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1060 << " tries to retransmit stream data [" << pending.offset
1061 << ", " << pending.offset + pending.length
1062 << ") and fin: " << can_bundle_fin
1063 << ", consumed: " << consumed;
1064 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1065 consumed.fin_consumed);
1066 if (consumed.bytes_consumed < pending.length ||
1067 (can_bundle_fin && !consumed.fin_consumed)) {
1068 // Connection is write blocked.
1069 return;
1070 }
1071 }
1072 }
1073}
1074
1075bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1076 if (is_static_) {
1077 QUIC_BUG << "Cannot set TTL of a static stream.";
1078 return false;
1079 }
1080 if (deadline_.IsInitialized()) {
1081 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1082 return false;
1083 }
1084 if (!session()->session_decides_what_to_write()) {
1085 QUIC_DLOG(WARNING) << "This session does not support stream TTL yet.";
1086 return false;
1087 }
1088 QuicTime now = session()->connection()->clock()->ApproximateNow();
1089 deadline_ = now + ttl;
1090 return true;
1091}
1092
1093bool QuicStream::HasDeadlinePassed() const {
1094 if (!deadline_.IsInitialized()) {
1095 // No deadline has been set.
1096 return false;
1097 }
1098 DCHECK(session()->session_decides_what_to_write());
1099 QuicTime now = session()->connection()->clock()->ApproximateNow();
1100 if (now < deadline_) {
1101 return false;
1102 }
1103 // TTL expired.
1104 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1105 return true;
1106}
1107
1108void QuicStream::OnDeadlinePassed() {
1109 Reset(QUIC_STREAM_TTL_EXPIRED);
1110}
1111
1112void QuicStream::SendStopSending(uint16_t code) {
1113 if (transport_version() != QUIC_VERSION_99) {
1114 // If the connection is not version 99, do nothing.
1115 // Do not QUIC_BUG or anything; the application really does not need to know
1116 // what version the connection is in.
1117 return;
1118 }
1119 session_->SendStopSending(code, id_);
1120}
1121
1122void QuicStream::OnStopSending(uint16_t code) {}
1123
1124} // namespace quic