blob: 8ef4717dc7a267f5e074e117219caf0b92eb8484 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_stream.h"
6
vasilvv872e7a32019-03-12 16:42:44 -07007#include <string>
8
QUICHE teama6ef0a62019-03-07 20:34:33 -05009#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
10#include "net/third_party/quiche/src/quic/core/quic_session.h"
11#include "net/third_party/quiche/src/quic/core/quic_utils.h"
12#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
13#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050017
18using spdy::SpdyPriority;
19
20namespace quic {
21
22#define ENDPOINT \
23 (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ")
24
25namespace {
26
27size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) {
28 return session->config()->GetInitialStreamFlowControlWindowToSend();
29}
30
31size_t GetReceivedFlowControlWindow(QuicSession* session) {
32 if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) {
33 return session->config()->ReceivedInitialStreamFlowControlWindowBytes();
34 }
35
dschinazic7036122019-04-30 12:46:34 -070036 return kDefaultFlowControlSendWindow;
QUICHE teama6ef0a62019-03-07 20:34:33 -050037}
38
39} // namespace
40
41// static
42const SpdyPriority QuicStream::kDefaultPriority;
43
44PendingStream::PendingStream(QuicStreamId id, QuicSession* session)
45 : id_(id),
46 session_(session),
47 stream_bytes_read_(0),
48 fin_received_(false),
49 connection_flow_controller_(session->flow_controller()),
50 flow_controller_(session,
51 id,
52 /*is_connection_flow_controller*/ false,
53 GetReceivedFlowControlWindow(session),
54 GetInitialStreamFlowControlWindowToSend(session),
55 kStreamReceiveWindowLimit,
56 session_->flow_controller()->auto_tune_receive_window(),
57 session_->flow_controller()),
58 sequencer_(this) {}
59
60void PendingStream::OnDataAvailable() {
renjietang0c558862019-05-08 13:26:23 -070061 // It will be called when pending stream receives its first byte. But this
62 // call should simply be ignored so that data remains in sequencer.
QUICHE teama6ef0a62019-03-07 20:34:33 -050063}
64
65void PendingStream::OnFinRead() {
66 QUIC_BUG << "OnFinRead should not be called.";
67 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected fin read");
68}
69
70void PendingStream::AddBytesConsumed(QuicByteCount bytes) {
renjietangbb1c4892019-05-24 15:58:44 -070071 // It will be called when the metadata of the stream is consumed.
72 flow_controller_.AddBytesConsumed(bytes);
73 connection_flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -050074}
75
76void PendingStream::Reset(QuicRstStreamErrorCode error) {
77 session_->SendRstStream(id_, error, 0);
78}
79
80void PendingStream::CloseConnectionWithDetails(QuicErrorCode error,
vasilvvc48c8712019-03-11 13:38:16 -070081 const std::string& details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -050082 session_->connection()->CloseConnection(
83 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
84}
85
86QuicStreamId PendingStream::id() const {
87 return id_;
88}
89
90const QuicSocketAddress& PendingStream::PeerAddressOfLatestPacket() const {
91 return session_->connection()->last_packet_source_address();
92}
93
94void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) {
95 DCHECK_EQ(frame.stream_id, id_);
QUICHE teama6ef0a62019-03-07 20:34:33 -050096
97 bool is_stream_too_long =
98 (frame.offset > kMaxStreamLength) ||
99 (kMaxStreamLength - frame.offset < frame.data_length);
100 if (is_stream_too_long) {
101 // Close connection if stream becomes too long.
102 QUIC_PEER_BUG
103 << "Receive stream frame reaches max stream length. frame offset "
104 << frame.offset << " length " << frame.data_length;
105 CloseConnectionWithDetails(
106 QUIC_STREAM_LENGTH_OVERFLOW,
107 "Peer sends more data than allowed on this stream.");
108 return;
109 }
110
111 if (frame.fin) {
112 fin_received_ = true;
113 }
114
115 // This count includes duplicate data received.
116 size_t frame_payload_size = frame.data_length;
117 stream_bytes_read_ += frame_payload_size;
118
119 // Flow control is interested in tracking highest received offset.
120 // Only interested in received frames that carry data.
121 if (frame_payload_size > 0 &&
122 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
123 // As the highest received offset has changed, check to see if this is a
124 // violation of flow control.
125 if (flow_controller_.FlowControlViolation() ||
126 connection_flow_controller_->FlowControlViolation()) {
127 CloseConnectionWithDetails(
128 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
129 "Flow control violation after increasing offset");
130 return;
131 }
132 }
133
134 sequencer_.OnStreamFrame(frame);
135}
136
137void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) {
138 DCHECK_EQ(frame.stream_id, id_);
139
140 if (frame.byte_offset > kMaxStreamLength) {
141 // Peer are not suppose to write bytes more than maxium allowed.
142 CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
143 "Reset frame stream offset overflow.");
144 return;
145 }
146 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
147 if (flow_controller_.FlowControlViolation() ||
148 connection_flow_controller_->FlowControlViolation()) {
149 CloseConnectionWithDetails(
150 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
151 "Flow control violation after increasing offset");
152 return;
153 }
154}
155
156bool PendingStream::MaybeIncreaseHighestReceivedOffset(
157 QuicStreamOffset new_offset) {
158 uint64_t increment =
159 new_offset - flow_controller_.highest_received_byte_offset();
160 if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) {
161 return false;
162 }
163
164 // If |new_offset| increased the stream flow controller's highest received
165 // offset, increase the connection flow controller's value by the incremental
166 // difference.
167 connection_flow_controller_->UpdateHighestReceivedOffset(
168 connection_flow_controller_->highest_received_byte_offset() + increment);
169 return true;
170}
171
renjietangbb1c4892019-05-24 15:58:44 -0700172void PendingStream::MarkConsumed(size_t num_bytes) {
173 sequencer_.MarkConsumed(num_bytes);
174}
175
renjietangbaea59c2019-05-29 15:08:14 -0700176QuicStream::QuicStream(PendingStream* pending, StreamType type, bool is_static)
177 : QuicStream(pending->id_,
178 pending->session_,
179 std::move(pending->sequencer_),
renjietang35448992019-05-08 17:08:57 -0700180 is_static,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500181 type,
renjietangbaea59c2019-05-29 15:08:14 -0700182 pending->stream_bytes_read_,
183 pending->fin_received_,
184 std::move(pending->flow_controller_),
185 pending->connection_flow_controller_) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500186 sequencer_.set_stream(this);
187}
188
nharperd5c4a932019-05-13 13:58:49 -0700189namespace {
190
191QuicOptional<QuicFlowController> FlowController(QuicStreamId id,
192 QuicSession* session,
193 StreamType type) {
194 if (type == CRYPTO) {
195 // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when
196 // it is using crypto frames instead of stream frames. The QuicCryptoStream
197 // doesn't have any flow control in that case, so we don't create a
198 // QuicFlowController for it.
199 return QuicOptional<QuicFlowController>();
200 }
201 return QuicFlowController(
202 session, id,
203 /*is_connection_flow_controller*/ false,
204 GetReceivedFlowControlWindow(session),
205 GetInitialStreamFlowControlWindowToSend(session),
206 kStreamReceiveWindowLimit,
207 session->flow_controller()->auto_tune_receive_window(),
208 session->flow_controller());
209}
210
211} // namespace
212
QUICHE teama6ef0a62019-03-07 20:34:33 -0500213QuicStream::QuicStream(QuicStreamId id,
214 QuicSession* session,
215 bool is_static,
216 StreamType type)
217 : QuicStream(id,
218 session,
219 QuicStreamSequencer(this),
220 is_static,
221 type,
222 0,
223 false,
nharperd5c4a932019-05-13 13:58:49 -0700224 FlowController(id, session, type),
QUICHE teama6ef0a62019-03-07 20:34:33 -0500225 session->flow_controller()) {}
226
227QuicStream::QuicStream(QuicStreamId id,
228 QuicSession* session,
229 QuicStreamSequencer sequencer,
230 bool is_static,
231 StreamType type,
232 uint64_t stream_bytes_read,
233 bool fin_received,
nharperd5c4a932019-05-13 13:58:49 -0700234 QuicOptional<QuicFlowController> flow_controller,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500235 QuicFlowController* connection_flow_controller)
236 : sequencer_(std::move(sequencer)),
237 id_(id),
238 session_(session),
239 priority_(kDefaultPriority),
240 stream_bytes_read_(stream_bytes_read),
241 stream_error_(QUIC_STREAM_NO_ERROR),
242 connection_error_(QUIC_NO_ERROR),
243 read_side_closed_(false),
244 write_side_closed_(false),
245 fin_buffered_(false),
246 fin_sent_(false),
247 fin_outstanding_(false),
248 fin_lost_(false),
249 fin_received_(fin_received),
250 rst_sent_(false),
251 rst_received_(false),
252 perspective_(session_->perspective()),
253 flow_controller_(std::move(flow_controller)),
254 connection_flow_controller_(connection_flow_controller),
255 stream_contributes_to_connection_flow_control_(true),
256 busy_counter_(0),
257 add_random_padding_after_fin_(false),
258 send_buffer_(
259 session->connection()->helper()->GetStreamSendBufferAllocator()),
260 buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)),
261 is_static_(is_static),
262 deadline_(QuicTime::Zero()),
fkastenholz305e1732019-06-18 05:01:22 -0700263 type_(VersionHasIetfQuicFrames(
264 session->connection()->transport_version()) &&
nharperd5c4a932019-05-13 13:58:49 -0700265 type != CRYPTO
QUICHE teama6ef0a62019-03-07 20:34:33 -0500266 ? QuicUtils::GetStreamType(id_,
267 perspective_,
268 session->IsIncomingStream(id_))
269 : type) {
270 if (type_ == WRITE_UNIDIRECTIONAL) {
271 set_fin_received(true);
272 CloseReadSide();
273 } else if (type_ == READ_UNIDIRECTIONAL) {
274 set_fin_sent(true);
275 CloseWriteSide();
276 }
277 SetFromConfig();
nharperd5c4a932019-05-13 13:58:49 -0700278 if (type_ != CRYPTO) {
279 session_->RegisterStreamPriority(id, is_static_, priority_);
280 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500281}
282
283QuicStream::~QuicStream() {
284 if (session_ != nullptr && IsWaitingForAcks()) {
285 QUIC_DVLOG(1)
286 << ENDPOINT << "Stream " << id_
287 << " gets destroyed while waiting for acks. stream_bytes_outstanding = "
288 << send_buffer_.stream_bytes_outstanding()
289 << ", fin_outstanding: " << fin_outstanding_;
290 }
nharperd5c4a932019-05-13 13:58:49 -0700291 if (session_ != nullptr && type_ != CRYPTO) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500292 session_->UnregisterStreamPriority(id(), is_static_);
293 }
294}
295
296void QuicStream::SetFromConfig() {}
297
298void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) {
299 DCHECK_EQ(frame.stream_id, id_);
300
301 DCHECK(!(read_side_closed_ && write_side_closed_));
302
303 if (type_ == WRITE_UNIDIRECTIONAL) {
304 CloseConnectionWithDetails(
305 QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM,
306 "Data received on write unidirectional stream");
307 return;
308 }
309
310 bool is_stream_too_long =
311 (frame.offset > kMaxStreamLength) ||
312 (kMaxStreamLength - frame.offset < frame.data_length);
313 if (is_stream_too_long) {
314 // Close connection if stream becomes too long.
315 QUIC_PEER_BUG << "Receive stream frame on stream " << id_
316 << " reaches max stream length. frame offset " << frame.offset
317 << " length " << frame.data_length << ". "
318 << sequencer_.DebugString();
319 CloseConnectionWithDetails(
320 QUIC_STREAM_LENGTH_OVERFLOW,
321 QuicStrCat("Peer sends more data than allowed on stream ", id_,
322 ". frame: offset = ", frame.offset, ", length = ",
323 frame.data_length, ". ", sequencer_.DebugString()));
324 return;
325 }
326 if (frame.fin) {
327 fin_received_ = true;
328 if (fin_sent_) {
329 session_->StreamDraining(id_);
330 }
331 }
332
333 if (read_side_closed_) {
334 QUIC_DLOG(INFO)
335 << ENDPOINT << "Stream " << frame.stream_id
336 << " is closed for reading. Ignoring newly received stream data.";
337 // The subclass does not want to read data: blackhole the data.
338 return;
339 }
340
341 // This count includes duplicate data received.
342 size_t frame_payload_size = frame.data_length;
343 stream_bytes_read_ += frame_payload_size;
344
345 // Flow control is interested in tracking highest received offset.
346 // Only interested in received frames that carry data.
347 if (frame_payload_size > 0 &&
348 MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) {
349 // As the highest received offset has changed, check to see if this is a
350 // violation of flow control.
nharperd5c4a932019-05-13 13:58:49 -0700351 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500352 connection_flow_controller_->FlowControlViolation()) {
353 CloseConnectionWithDetails(
354 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
355 "Flow control violation after increasing offset");
356 return;
357 }
358 }
359
360 sequencer_.OnStreamFrame(frame);
361}
362
363int QuicStream::num_frames_received() const {
364 return sequencer_.num_frames_received();
365}
366
367int QuicStream::num_duplicate_frames_received() const {
368 return sequencer_.num_duplicate_frames_received();
369}
370
371void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) {
372 rst_received_ = true;
373 if (frame.byte_offset > kMaxStreamLength) {
374 // Peer are not suppose to write bytes more than maxium allowed.
375 CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW,
376 "Reset frame stream offset overflow.");
377 return;
378 }
379 MaybeIncreaseHighestReceivedOffset(frame.byte_offset);
nharperd5c4a932019-05-13 13:58:49 -0700380 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500381 connection_flow_controller_->FlowControlViolation()) {
382 CloseConnectionWithDetails(
383 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
384 "Flow control violation after increasing offset");
385 return;
386 }
387
388 stream_error_ = frame.error_code;
389 // Google QUIC closes both sides of the stream in response to a
390 // RESET_STREAM, IETF QUIC closes only the read side.
fkastenholz305e1732019-06-18 05:01:22 -0700391 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500392 CloseWriteSide();
393 }
394 CloseReadSide();
395}
396
397void QuicStream::OnConnectionClosed(QuicErrorCode error,
398 ConnectionCloseSource /*source*/) {
399 if (read_side_closed_ && write_side_closed_) {
400 return;
401 }
402 if (error != QUIC_NO_ERROR) {
403 stream_error_ = QUIC_STREAM_CONNECTION_ERROR;
404 connection_error_ = error;
405 }
406
407 CloseWriteSide();
408 CloseReadSide();
409}
410
411void QuicStream::OnFinRead() {
412 DCHECK(sequencer_.IsClosed());
413 // OnFinRead can be called due to a FIN flag in a headers block, so there may
414 // have been no OnStreamFrame call with a FIN in the frame.
415 fin_received_ = true;
416 // If fin_sent_ is true, then CloseWriteSide has already been called, and the
417 // stream will be destroyed by CloseReadSide, so don't need to call
418 // StreamDraining.
419 CloseReadSide();
420}
421
422void QuicStream::Reset(QuicRstStreamErrorCode error) {
423 stream_error_ = error;
424 // Sending a RstStream results in calling CloseStream.
425 session()->SendRstStream(id(), error, stream_bytes_written());
426 rst_sent_ = true;
427}
428
429void QuicStream::CloseConnectionWithDetails(QuicErrorCode error,
vasilvvc48c8712019-03-11 13:38:16 -0700430 const std::string& details) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500431 session()->connection()->CloseConnection(
432 error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
433}
434
435SpdyPriority QuicStream::priority() const {
436 return priority_;
437}
438
439void QuicStream::SetPriority(SpdyPriority priority) {
440 priority_ = priority;
441 session_->UpdateStreamPriority(id(), priority);
442}
443
444void QuicStream::WriteOrBufferData(
445 QuicStringPiece data,
446 bool fin,
447 QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) {
448 if (data.empty() && !fin) {
449 QUIC_BUG << "data.empty() && !fin";
450 return;
451 }
452
453 if (fin_buffered_) {
454 QUIC_BUG << "Fin already buffered";
455 return;
456 }
457 if (write_side_closed_) {
458 QUIC_DLOG(ERROR) << ENDPOINT
459 << "Attempt to write when the write side is closed";
460 if (type_ == READ_UNIDIRECTIONAL) {
461 CloseConnectionWithDetails(
462 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
463 "Try to send data on read unidirectional stream");
464 }
465 return;
466 }
467
QUICHE teama6ef0a62019-03-07 20:34:33 -0500468 fin_buffered_ = fin;
469
470 bool had_buffered_data = HasBufferedData();
471 // Do not respect buffered data upper limit as WriteOrBufferData guarantees
472 // all data to be consumed.
473 if (data.length() > 0) {
474 struct iovec iov(QuicUtils::MakeIovec(data));
475 QuicStreamOffset offset = send_buffer_.stream_offset();
476 if (kMaxStreamLength - offset < data.length()) {
477 QUIC_BUG << "Write too many data via stream " << id_;
478 CloseConnectionWithDetails(
479 QUIC_STREAM_LENGTH_OVERFLOW,
480 QuicStrCat("Write too many data via stream ", id_));
481 return;
482 }
483 send_buffer_.SaveStreamData(&iov, 1, 0, data.length());
484 OnDataBuffered(offset, data.length(), ack_listener);
485 }
486 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
487 // Write data if there is no buffered data before.
488 WriteBufferedData();
489 }
490}
491
492void QuicStream::OnCanWrite() {
493 if (HasDeadlinePassed()) {
494 OnDeadlinePassed();
495 return;
496 }
497 if (HasPendingRetransmission()) {
498 WritePendingRetransmission();
499 // Exit early to allow other streams to write pending retransmissions if
500 // any.
501 return;
502 }
503
504 if (write_side_closed_) {
505 QUIC_DLOG(ERROR)
506 << ENDPOINT << "Stream " << id()
507 << " attempting to write new data when the write side is closed";
508 return;
509 }
510 if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) {
511 WriteBufferedData();
512 }
513 if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) {
514 // Notify upper layer to write new data when buffered data size is below
515 // low water mark.
516 OnCanWriteNewData();
517 }
518}
519
520void QuicStream::MaybeSendBlocked() {
nharperd5c4a932019-05-13 13:58:49 -0700521 if (flow_controller_->ShouldSendBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500522 session_->SendBlocked(id_);
523 }
524 if (!stream_contributes_to_connection_flow_control_) {
525 return;
526 }
527 if (connection_flow_controller_->ShouldSendBlocked()) {
528 session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version()));
529 }
530 // If the stream is blocked by connection-level flow control but not by
531 // stream-level flow control, add the stream to the write blocked list so that
532 // the stream will be given a chance to write when a connection-level
533 // WINDOW_UPDATE arrives.
534 if (connection_flow_controller_->IsBlocked() &&
nharperd5c4a932019-05-13 13:58:49 -0700535 !flow_controller_->IsBlocked()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500536 session_->MarkConnectionLevelWriteBlocked(id());
537 }
538}
539
540QuicConsumedData QuicStream::WritevData(const struct iovec* iov,
541 int iov_count,
542 bool fin) {
543 if (write_side_closed_) {
544 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
545 << "attempting to write when the write side is closed";
546 if (type_ == READ_UNIDIRECTIONAL) {
547 CloseConnectionWithDetails(
548 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
549 "Try to send data on read unidirectional stream");
550 }
551 return QuicConsumedData(0, false);
552 }
553
554 // How much data was provided.
555 size_t write_length = 0;
556 if (iov != nullptr) {
557 for (int i = 0; i < iov_count; ++i) {
558 write_length += iov[i].iov_len;
559 }
560 }
561
562 QuicConsumedData consumed_data(0, false);
563 if (fin_buffered_) {
564 QUIC_BUG << "Fin already buffered";
565 return consumed_data;
566 }
567
568 if (kMaxStreamLength - send_buffer_.stream_offset() < write_length) {
569 QUIC_BUG << "Write too many data via stream " << id_;
570 CloseConnectionWithDetails(
571 QUIC_STREAM_LENGTH_OVERFLOW,
572 QuicStrCat("Write too many data via stream ", id_));
573 return consumed_data;
574 }
575
576 bool had_buffered_data = HasBufferedData();
577 if (CanWriteNewData()) {
578 // Save all data if buffered data size is below low water mark.
579 consumed_data.bytes_consumed = write_length;
580 if (consumed_data.bytes_consumed > 0) {
581 QuicStreamOffset offset = send_buffer_.stream_offset();
582 send_buffer_.SaveStreamData(iov, iov_count, 0, write_length);
583 OnDataBuffered(offset, write_length, nullptr);
584 }
585 }
586 consumed_data.fin_consumed =
587 consumed_data.bytes_consumed == write_length && fin;
588 fin_buffered_ = consumed_data.fin_consumed;
589
590 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
591 // Write data if there is no buffered data before.
592 WriteBufferedData();
593 }
594
595 return consumed_data;
596}
597
598QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) {
599 QuicConsumedData consumed_data(0, false);
600 if (span.empty() && !fin) {
601 QUIC_BUG << "span.empty() && !fin";
602 return consumed_data;
603 }
604
605 if (fin_buffered_) {
606 QUIC_BUG << "Fin already buffered";
607 return consumed_data;
608 }
609
610 if (write_side_closed_) {
611 QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id()
zhongyi4dc99c02019-05-30 14:11:04 -0700612 << " attempting to write when the write side is closed";
QUICHE teama6ef0a62019-03-07 20:34:33 -0500613 if (type_ == READ_UNIDIRECTIONAL) {
614 CloseConnectionWithDetails(
615 QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM,
616 "Try to send data on read unidirectional stream");
617 }
618 return consumed_data;
619 }
620
621 bool had_buffered_data = HasBufferedData();
622 if (CanWriteNewData() || span.empty()) {
623 consumed_data.fin_consumed = fin;
624 if (!span.empty()) {
625 // Buffer all data if buffered data size is below limit.
626 QuicStreamOffset offset = send_buffer_.stream_offset();
wub553a9662019-03-28 20:13:23 -0700627 consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500628 if (offset > send_buffer_.stream_offset() ||
629 kMaxStreamLength < send_buffer_.stream_offset()) {
630 QUIC_BUG << "Write too many data via stream " << id_;
631 CloseConnectionWithDetails(
632 QUIC_STREAM_LENGTH_OVERFLOW,
633 QuicStrCat("Write too many data via stream ", id_));
634 return consumed_data;
635 }
636 OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr);
637 }
638 }
639 fin_buffered_ = consumed_data.fin_consumed;
640
641 if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) {
642 // Write data if there is no buffered data before.
643 WriteBufferedData();
644 }
645
646 return consumed_data;
647}
648
649bool QuicStream::HasPendingRetransmission() const {
650 return send_buffer_.HasPendingRetransmission() || fin_lost_;
651}
652
653bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset,
654 QuicByteCount data_length,
655 bool fin) const {
656 return send_buffer_.IsStreamDataOutstanding(offset, data_length) ||
657 (fin && fin_outstanding_);
658}
659
QUICHE teama6ef0a62019-03-07 20:34:33 -0500660void QuicStream::CloseReadSide() {
661 if (read_side_closed_) {
662 return;
663 }
664 QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id();
665
666 read_side_closed_ = true;
667 sequencer_.ReleaseBuffer();
668
669 if (write_side_closed_) {
670 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
671 session_->CloseStream(id());
672 }
673}
674
675void QuicStream::CloseWriteSide() {
676 if (write_side_closed_) {
677 return;
678 }
679 QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id();
680
681 write_side_closed_ = true;
682 if (read_side_closed_) {
683 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id();
684 session_->CloseStream(id());
685 }
686}
687
688bool QuicStream::HasBufferedData() const {
689 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
690 return send_buffer_.stream_offset() > stream_bytes_written();
691}
692
693QuicTransportVersion QuicStream::transport_version() const {
694 return session_->connection()->transport_version();
695}
696
697HandshakeProtocol QuicStream::handshake_protocol() const {
698 return session_->connection()->version().handshake_protocol;
699}
700
701void QuicStream::StopReading() {
702 QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id();
703 sequencer_.StopReading();
704}
705
706const QuicSocketAddress& QuicStream::PeerAddressOfLatestPacket() const {
707 return session_->connection()->last_packet_source_address();
708}
709
710void QuicStream::OnClose() {
711 CloseReadSide();
712 CloseWriteSide();
713
714 if (!fin_sent_ && !rst_sent_) {
715 // For flow control accounting, tell the peer how many bytes have been
716 // written on this stream before termination. Done here if needed, using a
717 // RST_STREAM frame.
718 QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id();
719 session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT,
720 stream_bytes_written());
721 session_->OnStreamDoneWaitingForAcks(id_);
722 rst_sent_ = true;
723 }
724
nharperd5c4a932019-05-13 13:58:49 -0700725 if (flow_controller_->FlowControlViolation() ||
QUICHE teama6ef0a62019-03-07 20:34:33 -0500726 connection_flow_controller_->FlowControlViolation()) {
727 return;
728 }
729 // The stream is being closed and will not process any further incoming bytes.
730 // As there may be more bytes in flight, to ensure that both endpoints have
731 // the same connection level flow control state, mark all unreceived or
732 // buffered bytes as consumed.
733 QuicByteCount bytes_to_consume =
nharperd5c4a932019-05-13 13:58:49 -0700734 flow_controller_->highest_received_byte_offset() -
735 flow_controller_->bytes_consumed();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500736 AddBytesConsumed(bytes_to_consume);
737}
738
739void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
renjietang28c04b72019-07-01 15:08:09 -0700740 if (GetQuicReloadableFlag(quic_no_window_update_on_read_only_stream) &&
741 type_ == READ_UNIDIRECTIONAL) {
742 QUIC_RELOADABLE_FLAG_COUNT(quic_no_window_update_on_read_only_stream);
743 CloseConnectionWithDetails(
744 QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
745 "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.");
746 return;
747 }
748
nharperd5c4a932019-05-13 13:58:49 -0700749 if (flow_controller_->UpdateSendWindowOffset(frame.byte_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500750 // Let session unblock this stream.
751 session_->MarkConnectionLevelWriteBlocked(id_);
752 }
753}
754
755bool QuicStream::MaybeIncreaseHighestReceivedOffset(
756 QuicStreamOffset new_offset) {
757 uint64_t increment =
nharperd5c4a932019-05-13 13:58:49 -0700758 new_offset - flow_controller_->highest_received_byte_offset();
759 if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500760 return false;
761 }
762
763 // If |new_offset| increased the stream flow controller's highest received
764 // offset, increase the connection flow controller's value by the incremental
765 // difference.
766 if (stream_contributes_to_connection_flow_control_) {
767 connection_flow_controller_->UpdateHighestReceivedOffset(
768 connection_flow_controller_->highest_received_byte_offset() +
769 increment);
770 }
771 return true;
772}
773
774void QuicStream::AddBytesSent(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700775 flow_controller_->AddBytesSent(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500776 if (stream_contributes_to_connection_flow_control_) {
777 connection_flow_controller_->AddBytesSent(bytes);
778 }
779}
780
781void QuicStream::AddBytesConsumed(QuicByteCount bytes) {
nharperd5c4a932019-05-13 13:58:49 -0700782 if (type_ == CRYPTO) {
783 // A stream with type CRYPTO has no flow control, so there's nothing this
784 // function needs to do. This function still gets called by the
785 // QuicStreamSequencers used by QuicCryptoStream.
786 return;
787 }
QUICHE teama6ef0a62019-03-07 20:34:33 -0500788 // Only adjust stream level flow controller if still reading.
789 if (!read_side_closed_) {
nharperd5c4a932019-05-13 13:58:49 -0700790 flow_controller_->AddBytesConsumed(bytes);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500791 }
792
793 if (stream_contributes_to_connection_flow_control_) {
794 connection_flow_controller_->AddBytesConsumed(bytes);
795 }
796}
797
798void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) {
nharperd5c4a932019-05-13 13:58:49 -0700799 if (flow_controller_->UpdateSendWindowOffset(new_window)) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500800 // Let session unblock this stream.
801 session_->MarkConnectionLevelWriteBlocked(id_);
802 }
803}
804
805void QuicStream::AddRandomPaddingAfterFin() {
806 add_random_padding_after_fin_ = true;
807}
808
809bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset,
810 QuicByteCount data_length,
811 bool fin_acked,
dschinazi17d42422019-06-18 16:35:07 -0700812 QuicTime::Delta /*ack_delay_time*/,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500813 QuicByteCount* newly_acked_length) {
814 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking "
815 << "[" << offset << ", " << offset + data_length << "]"
816 << " fin = " << fin_acked;
817 *newly_acked_length = 0;
818 if (!send_buffer_.OnStreamDataAcked(offset, data_length,
819 newly_acked_length)) {
820 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
821 "Trying to ack unsent data.");
822 return false;
823 }
824 if (!fin_sent_ && fin_acked) {
825 CloseConnectionWithDetails(QUIC_INTERNAL_ERROR,
826 "Trying to ack unsent fin.");
827 return false;
828 }
829 // Indicates whether ack listener's OnPacketAcked should be called.
830 const bool new_data_acked =
831 *newly_acked_length > 0 || (fin_acked && fin_outstanding_);
832 if (fin_acked) {
833 fin_outstanding_ = false;
834 fin_lost_ = false;
835 }
836 if (!IsWaitingForAcks()) {
837 session_->OnStreamDoneWaitingForAcks(id_);
838 }
839 return new_data_acked;
840}
841
842void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset,
843 QuicByteCount data_length,
844 bool fin_retransmitted) {
845 send_buffer_.OnStreamDataRetransmitted(offset, data_length);
846 if (fin_retransmitted) {
847 fin_lost_ = false;
848 }
849}
850
851void QuicStream::OnStreamFrameLost(QuicStreamOffset offset,
852 QuicByteCount data_length,
853 bool fin_lost) {
854 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting "
855 << "[" << offset << ", " << offset + data_length << "]"
856 << " fin = " << fin_lost;
857 if (data_length > 0) {
858 send_buffer_.OnStreamDataLost(offset, data_length);
859 }
860 if (fin_lost && fin_outstanding_) {
861 fin_lost_ = true;
862 }
863}
864
865bool QuicStream::RetransmitStreamData(QuicStreamOffset offset,
866 QuicByteCount data_length,
867 bool fin) {
868 if (HasDeadlinePassed()) {
869 OnDeadlinePassed();
870 return true;
871 }
872 QuicIntervalSet<QuicStreamOffset> retransmission(offset,
873 offset + data_length);
874 retransmission.Difference(bytes_acked());
875 bool retransmit_fin = fin && fin_outstanding_;
876 if (retransmission.Empty() && !retransmit_fin) {
877 return true;
878 }
879 QuicConsumedData consumed(0, false);
880 for (const auto& interval : retransmission) {
881 QuicStreamOffset retransmission_offset = interval.min();
882 QuicByteCount retransmission_length = interval.max() - interval.min();
883 const bool can_bundle_fin =
884 retransmit_fin && (retransmission_offset + retransmission_length ==
885 stream_bytes_written());
886 consumed = session()->WritevData(this, id_, retransmission_length,
887 retransmission_offset,
888 can_bundle_fin ? FIN : NO_FIN);
889 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
890 << " is forced to retransmit stream data ["
891 << retransmission_offset << ", "
892 << retransmission_offset + retransmission_length
893 << ") and fin: " << can_bundle_fin
894 << ", consumed: " << consumed;
895 OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed,
896 consumed.fin_consumed);
897 if (can_bundle_fin) {
898 retransmit_fin = !consumed.fin_consumed;
899 }
900 if (consumed.bytes_consumed < retransmission_length ||
901 (can_bundle_fin && !consumed.fin_consumed)) {
902 // Connection is write blocked.
903 return false;
904 }
905 }
906 if (retransmit_fin) {
907 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
908 << " retransmits fin only frame.";
909 consumed = session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
910 if (!consumed.fin_consumed) {
911 return false;
912 }
913 }
914 return true;
915}
916
917bool QuicStream::IsWaitingForAcks() const {
918 return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) &&
919 (send_buffer_.stream_bytes_outstanding() || fin_outstanding_);
920}
921
922size_t QuicStream::ReadableBytes() const {
923 return sequencer_.ReadableBytes();
924}
925
926bool QuicStream::WriteStreamData(QuicStreamOffset offset,
927 QuicByteCount data_length,
928 QuicDataWriter* writer) {
929 DCHECK_LT(0u, data_length);
930 QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset "
931 << offset << " length " << data_length;
932 return send_buffer_.WriteStreamData(offset, data_length, writer);
933}
934
935void QuicStream::WriteBufferedData() {
936 DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_));
937
938 if (session_->ShouldYield(id())) {
939 session_->MarkConnectionLevelWriteBlocked(id());
940 return;
941 }
942
943 // Size of buffered data.
944 size_t write_length = BufferedDataBytes();
945
946 // A FIN with zero data payload should not be flow control blocked.
947 bool fin_with_zero_data = (fin_buffered_ && write_length == 0);
948
949 bool fin = fin_buffered_;
950
951 // How much data flow control permits to be written.
nharperd5c4a932019-05-13 13:58:49 -0700952 QuicByteCount send_window = flow_controller_->SendWindowSize();
QUICHE teama6ef0a62019-03-07 20:34:33 -0500953 if (stream_contributes_to_connection_flow_control_) {
954 send_window =
955 std::min(send_window, connection_flow_controller_->SendWindowSize());
956 }
957
958 if (send_window == 0 && !fin_with_zero_data) {
959 // Quick return if nothing can be sent.
960 MaybeSendBlocked();
961 return;
962 }
963
964 if (write_length > send_window) {
965 // Don't send the FIN unless all the data will be sent.
966 fin = false;
967
968 // Writing more data would be a violation of flow control.
969 write_length = static_cast<size_t>(send_window);
970 QUIC_DVLOG(1) << "stream " << id() << " shortens write length to "
971 << write_length << " due to flow control";
972 }
973 if (session_->session_decides_what_to_write()) {
974 session_->SetTransmissionType(NOT_RETRANSMISSION);
975 }
fayang97857352019-07-01 06:15:26 -0700976
977 StreamSendingState state = fin ? FIN : NO_FIN;
978 if (fin && add_random_padding_after_fin_) {
979 state = FIN_AND_PADDING;
980 }
981 QuicConsumedData consumed_data = session_->WritevData(
982 this, id(), write_length, stream_bytes_written(), state);
QUICHE teama6ef0a62019-03-07 20:34:33 -0500983
984 OnStreamDataConsumed(consumed_data.bytes_consumed);
985
986 AddBytesSent(consumed_data.bytes_consumed);
987 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends "
988 << stream_bytes_written() << " bytes "
989 << " and has buffered data " << BufferedDataBytes() << " bytes."
990 << " fin is sent: " << consumed_data.fin_consumed
991 << " fin is buffered: " << fin_buffered_;
992
993 // The write may have generated a write error causing this stream to be
994 // closed. If so, simply return without marking the stream write blocked.
995 if (write_side_closed_) {
996 return;
997 }
998
999 if (consumed_data.bytes_consumed == write_length) {
1000 if (!fin_with_zero_data) {
1001 MaybeSendBlocked();
1002 }
1003 if (fin && consumed_data.fin_consumed) {
1004 fin_sent_ = true;
1005 fin_outstanding_ = true;
1006 if (fin_received_) {
1007 session_->StreamDraining(id_);
1008 }
1009 CloseWriteSide();
1010 } else if (fin && !consumed_data.fin_consumed) {
1011 session_->MarkConnectionLevelWriteBlocked(id());
1012 }
1013 } else {
1014 session_->MarkConnectionLevelWriteBlocked(id());
1015 }
1016 if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) {
1017 busy_counter_ = 0;
1018 }
zhongyi1b2f7832019-06-14 13:31:34 -07001019
1020 if (IsWaitingForAcks()) {
1021 session_->OnStreamWaitingForAcks(id_);
1022 }
QUICHE teama6ef0a62019-03-07 20:34:33 -05001023}
1024
1025uint64_t QuicStream::BufferedDataBytes() const {
1026 DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written());
1027 return send_buffer_.stream_offset() - stream_bytes_written();
1028}
1029
1030bool QuicStream::CanWriteNewData() const {
1031 return BufferedDataBytes() < buffered_data_threshold_;
1032}
1033
1034bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const {
1035 return (BufferedDataBytes() + length) < buffered_data_threshold_;
1036}
1037
1038uint64_t QuicStream::stream_bytes_written() const {
1039 return send_buffer_.stream_bytes_written();
1040}
1041
1042const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const {
1043 return send_buffer_.bytes_acked();
1044}
1045
1046void QuicStream::OnStreamDataConsumed(size_t bytes_consumed) {
1047 send_buffer_.OnStreamDataConsumed(bytes_consumed);
1048}
1049
1050void QuicStream::WritePendingRetransmission() {
1051 while (HasPendingRetransmission()) {
1052 QuicConsumedData consumed(0, false);
1053 if (!send_buffer_.HasPendingRetransmission()) {
1054 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1055 << " retransmits fin only frame.";
1056 consumed =
1057 session()->WritevData(this, id_, 0, stream_bytes_written(), FIN);
1058 fin_lost_ = !consumed.fin_consumed;
1059 if (fin_lost_) {
1060 // Connection is write blocked.
1061 return;
1062 }
1063 } else {
1064 StreamPendingRetransmission pending =
1065 send_buffer_.NextPendingRetransmission();
1066 // Determine whether the lost fin can be bundled with the data.
1067 const bool can_bundle_fin =
1068 fin_lost_ &&
1069 (pending.offset + pending.length == stream_bytes_written());
1070 consumed =
1071 session()->WritevData(this, id_, pending.length, pending.offset,
1072 can_bundle_fin ? FIN : NO_FIN);
1073 QUIC_DVLOG(1) << ENDPOINT << "stream " << id_
1074 << " tries to retransmit stream data [" << pending.offset
1075 << ", " << pending.offset + pending.length
1076 << ") and fin: " << can_bundle_fin
1077 << ", consumed: " << consumed;
1078 OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed,
1079 consumed.fin_consumed);
1080 if (consumed.bytes_consumed < pending.length ||
1081 (can_bundle_fin && !consumed.fin_consumed)) {
1082 // Connection is write blocked.
1083 return;
1084 }
1085 }
1086 }
1087}
1088
1089bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) {
1090 if (is_static_) {
1091 QUIC_BUG << "Cannot set TTL of a static stream.";
1092 return false;
1093 }
1094 if (deadline_.IsInitialized()) {
1095 QUIC_DLOG(WARNING) << "Deadline has already been set.";
1096 return false;
1097 }
1098 if (!session()->session_decides_what_to_write()) {
1099 QUIC_DLOG(WARNING) << "This session does not support stream TTL yet.";
1100 return false;
1101 }
1102 QuicTime now = session()->connection()->clock()->ApproximateNow();
1103 deadline_ = now + ttl;
1104 return true;
1105}
1106
1107bool QuicStream::HasDeadlinePassed() const {
1108 if (!deadline_.IsInitialized()) {
1109 // No deadline has been set.
1110 return false;
1111 }
1112 DCHECK(session()->session_decides_what_to_write());
1113 QuicTime now = session()->connection()->clock()->ApproximateNow();
1114 if (now < deadline_) {
1115 return false;
1116 }
1117 // TTL expired.
1118 QUIC_DVLOG(1) << "stream " << id() << " deadline has passed";
1119 return true;
1120}
1121
1122void QuicStream::OnDeadlinePassed() {
1123 Reset(QUIC_STREAM_TTL_EXPIRED);
1124}
1125
1126void QuicStream::SendStopSending(uint16_t code) {
fkastenholz305e1732019-06-18 05:01:22 -07001127 if (!VersionHasIetfQuicFrames(transport_version())) {
QUICHE teama6ef0a62019-03-07 20:34:33 -05001128 // If the connection is not version 99, do nothing.
1129 // Do not QUIC_BUG or anything; the application really does not need to know
1130 // what version the connection is in.
1131 return;
1132 }
1133 session_->SendStopSending(code, id_);
1134}
1135
dschinazi17d42422019-06-18 16:35:07 -07001136void QuicStream::OnStopSending(uint16_t /*code*/) {}
QUICHE teama6ef0a62019-03-07 20:34:33 -05001137
1138} // namespace quic