QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 1 | // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style license that can be |
| 3 | // found in the LICENSE file. |
| 4 | |
| 5 | #include "net/third_party/quiche/src/quic/core/quic_stream.h" |
| 6 | |
vasilvv | 872e7a3 | 2019-03-12 16:42:44 -0700 | [diff] [blame] | 7 | #include <string> |
| 8 | |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 9 | #include "net/third_party/quiche/src/quic/core/quic_flow_controller.h" |
| 10 | #include "net/third_party/quiche/src/quic/core/quic_session.h" |
| 11 | #include "net/third_party/quiche/src/quic/core/quic_utils.h" |
| 12 | #include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h" |
| 13 | #include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h" |
| 14 | #include "net/third_party/quiche/src/quic/platform/api/quic_flags.h" |
| 15 | #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" |
| 16 | #include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h" |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 17 | |
| 18 | using spdy::SpdyPriority; |
| 19 | |
| 20 | namespace quic { |
| 21 | |
| 22 | #define ENDPOINT \ |
| 23 | (perspective_ == Perspective::IS_SERVER ? "Server: " : "Client: ") |
| 24 | |
| 25 | namespace { |
| 26 | |
| 27 | size_t GetInitialStreamFlowControlWindowToSend(QuicSession* session) { |
| 28 | return session->config()->GetInitialStreamFlowControlWindowToSend(); |
| 29 | } |
| 30 | |
| 31 | size_t GetReceivedFlowControlWindow(QuicSession* session) { |
| 32 | if (session->config()->HasReceivedInitialStreamFlowControlWindowBytes()) { |
| 33 | return session->config()->ReceivedInitialStreamFlowControlWindowBytes(); |
| 34 | } |
| 35 | |
dschinazi | c703612 | 2019-04-30 12:46:34 -0700 | [diff] [blame] | 36 | return kDefaultFlowControlSendWindow; |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 37 | } |
| 38 | |
| 39 | } // namespace |
| 40 | |
| 41 | // static |
| 42 | const SpdyPriority QuicStream::kDefaultPriority; |
| 43 | |
| 44 | PendingStream::PendingStream(QuicStreamId id, QuicSession* session) |
| 45 | : id_(id), |
| 46 | session_(session), |
| 47 | stream_bytes_read_(0), |
| 48 | fin_received_(false), |
| 49 | connection_flow_controller_(session->flow_controller()), |
| 50 | flow_controller_(session, |
| 51 | id, |
| 52 | /*is_connection_flow_controller*/ false, |
| 53 | GetReceivedFlowControlWindow(session), |
| 54 | GetInitialStreamFlowControlWindowToSend(session), |
| 55 | kStreamReceiveWindowLimit, |
| 56 | session_->flow_controller()->auto_tune_receive_window(), |
| 57 | session_->flow_controller()), |
| 58 | sequencer_(this) {} |
| 59 | |
| 60 | void PendingStream::OnDataAvailable() { |
renjietang | 0c55886 | 2019-05-08 13:26:23 -0700 | [diff] [blame] | 61 | // It will be called when pending stream receives its first byte. But this |
| 62 | // call should simply be ignored so that data remains in sequencer. |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 63 | } |
| 64 | |
| 65 | void PendingStream::OnFinRead() { |
| 66 | QUIC_BUG << "OnFinRead should not be called."; |
| 67 | CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, "Unexpected fin read"); |
| 68 | } |
| 69 | |
| 70 | void PendingStream::AddBytesConsumed(QuicByteCount bytes) { |
renjietang | bb1c489 | 2019-05-24 15:58:44 -0700 | [diff] [blame] | 71 | // It will be called when the metadata of the stream is consumed. |
| 72 | flow_controller_.AddBytesConsumed(bytes); |
| 73 | connection_flow_controller_->AddBytesConsumed(bytes); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 74 | } |
| 75 | |
| 76 | void PendingStream::Reset(QuicRstStreamErrorCode error) { |
| 77 | session_->SendRstStream(id_, error, 0); |
| 78 | } |
| 79 | |
| 80 | void PendingStream::CloseConnectionWithDetails(QuicErrorCode error, |
vasilvv | c48c871 | 2019-03-11 13:38:16 -0700 | [diff] [blame] | 81 | const std::string& details) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 82 | session_->connection()->CloseConnection( |
| 83 | error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); |
| 84 | } |
| 85 | |
| 86 | QuicStreamId PendingStream::id() const { |
| 87 | return id_; |
| 88 | } |
| 89 | |
| 90 | const QuicSocketAddress& PendingStream::PeerAddressOfLatestPacket() const { |
| 91 | return session_->connection()->last_packet_source_address(); |
| 92 | } |
| 93 | |
| 94 | void PendingStream::OnStreamFrame(const QuicStreamFrame& frame) { |
| 95 | DCHECK_EQ(frame.stream_id, id_); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 96 | |
| 97 | bool is_stream_too_long = |
| 98 | (frame.offset > kMaxStreamLength) || |
| 99 | (kMaxStreamLength - frame.offset < frame.data_length); |
| 100 | if (is_stream_too_long) { |
| 101 | // Close connection if stream becomes too long. |
| 102 | QUIC_PEER_BUG |
| 103 | << "Receive stream frame reaches max stream length. frame offset " |
| 104 | << frame.offset << " length " << frame.data_length; |
| 105 | CloseConnectionWithDetails( |
| 106 | QUIC_STREAM_LENGTH_OVERFLOW, |
| 107 | "Peer sends more data than allowed on this stream."); |
| 108 | return; |
| 109 | } |
| 110 | |
| 111 | if (frame.fin) { |
| 112 | fin_received_ = true; |
| 113 | } |
| 114 | |
| 115 | // This count includes duplicate data received. |
| 116 | size_t frame_payload_size = frame.data_length; |
| 117 | stream_bytes_read_ += frame_payload_size; |
| 118 | |
| 119 | // Flow control is interested in tracking highest received offset. |
| 120 | // Only interested in received frames that carry data. |
| 121 | if (frame_payload_size > 0 && |
| 122 | MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { |
| 123 | // As the highest received offset has changed, check to see if this is a |
| 124 | // violation of flow control. |
| 125 | if (flow_controller_.FlowControlViolation() || |
| 126 | connection_flow_controller_->FlowControlViolation()) { |
| 127 | CloseConnectionWithDetails( |
| 128 | QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, |
| 129 | "Flow control violation after increasing offset"); |
| 130 | return; |
| 131 | } |
| 132 | } |
| 133 | |
| 134 | sequencer_.OnStreamFrame(frame); |
| 135 | } |
| 136 | |
| 137 | void PendingStream::OnRstStreamFrame(const QuicRstStreamFrame& frame) { |
| 138 | DCHECK_EQ(frame.stream_id, id_); |
| 139 | |
| 140 | if (frame.byte_offset > kMaxStreamLength) { |
| 141 | // Peer are not suppose to write bytes more than maxium allowed. |
| 142 | CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW, |
| 143 | "Reset frame stream offset overflow."); |
| 144 | return; |
| 145 | } |
| 146 | MaybeIncreaseHighestReceivedOffset(frame.byte_offset); |
| 147 | if (flow_controller_.FlowControlViolation() || |
| 148 | connection_flow_controller_->FlowControlViolation()) { |
| 149 | CloseConnectionWithDetails( |
| 150 | QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, |
| 151 | "Flow control violation after increasing offset"); |
| 152 | return; |
| 153 | } |
| 154 | } |
| 155 | |
| 156 | bool PendingStream::MaybeIncreaseHighestReceivedOffset( |
| 157 | QuicStreamOffset new_offset) { |
| 158 | uint64_t increment = |
| 159 | new_offset - flow_controller_.highest_received_byte_offset(); |
| 160 | if (!flow_controller_.UpdateHighestReceivedOffset(new_offset)) { |
| 161 | return false; |
| 162 | } |
| 163 | |
| 164 | // If |new_offset| increased the stream flow controller's highest received |
| 165 | // offset, increase the connection flow controller's value by the incremental |
| 166 | // difference. |
| 167 | connection_flow_controller_->UpdateHighestReceivedOffset( |
| 168 | connection_flow_controller_->highest_received_byte_offset() + increment); |
| 169 | return true; |
| 170 | } |
| 171 | |
renjietang | bb1c489 | 2019-05-24 15:58:44 -0700 | [diff] [blame] | 172 | void PendingStream::MarkConsumed(size_t num_bytes) { |
| 173 | sequencer_.MarkConsumed(num_bytes); |
| 174 | } |
| 175 | |
renjietang | baea59c | 2019-05-29 15:08:14 -0700 | [diff] [blame] | 176 | QuicStream::QuicStream(PendingStream* pending, StreamType type, bool is_static) |
| 177 | : QuicStream(pending->id_, |
| 178 | pending->session_, |
| 179 | std::move(pending->sequencer_), |
renjietang | 3544899 | 2019-05-08 17:08:57 -0700 | [diff] [blame] | 180 | is_static, |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 181 | type, |
renjietang | baea59c | 2019-05-29 15:08:14 -0700 | [diff] [blame] | 182 | pending->stream_bytes_read_, |
| 183 | pending->fin_received_, |
| 184 | std::move(pending->flow_controller_), |
| 185 | pending->connection_flow_controller_) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 186 | sequencer_.set_stream(this); |
| 187 | } |
| 188 | |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 189 | namespace { |
| 190 | |
| 191 | QuicOptional<QuicFlowController> FlowController(QuicStreamId id, |
| 192 | QuicSession* session, |
| 193 | StreamType type) { |
| 194 | if (type == CRYPTO) { |
| 195 | // The only QuicStream with a StreamType of CRYPTO is QuicCryptoStream, when |
| 196 | // it is using crypto frames instead of stream frames. The QuicCryptoStream |
| 197 | // doesn't have any flow control in that case, so we don't create a |
| 198 | // QuicFlowController for it. |
| 199 | return QuicOptional<QuicFlowController>(); |
| 200 | } |
| 201 | return QuicFlowController( |
| 202 | session, id, |
| 203 | /*is_connection_flow_controller*/ false, |
| 204 | GetReceivedFlowControlWindow(session), |
| 205 | GetInitialStreamFlowControlWindowToSend(session), |
| 206 | kStreamReceiveWindowLimit, |
| 207 | session->flow_controller()->auto_tune_receive_window(), |
| 208 | session->flow_controller()); |
| 209 | } |
| 210 | |
| 211 | } // namespace |
| 212 | |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 213 | QuicStream::QuicStream(QuicStreamId id, |
| 214 | QuicSession* session, |
| 215 | bool is_static, |
| 216 | StreamType type) |
| 217 | : QuicStream(id, |
| 218 | session, |
| 219 | QuicStreamSequencer(this), |
| 220 | is_static, |
| 221 | type, |
| 222 | 0, |
| 223 | false, |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 224 | FlowController(id, session, type), |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 225 | session->flow_controller()) {} |
| 226 | |
| 227 | QuicStream::QuicStream(QuicStreamId id, |
| 228 | QuicSession* session, |
| 229 | QuicStreamSequencer sequencer, |
| 230 | bool is_static, |
| 231 | StreamType type, |
| 232 | uint64_t stream_bytes_read, |
| 233 | bool fin_received, |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 234 | QuicOptional<QuicFlowController> flow_controller, |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 235 | QuicFlowController* connection_flow_controller) |
| 236 | : sequencer_(std::move(sequencer)), |
| 237 | id_(id), |
| 238 | session_(session), |
| 239 | priority_(kDefaultPriority), |
| 240 | stream_bytes_read_(stream_bytes_read), |
| 241 | stream_error_(QUIC_STREAM_NO_ERROR), |
| 242 | connection_error_(QUIC_NO_ERROR), |
| 243 | read_side_closed_(false), |
| 244 | write_side_closed_(false), |
| 245 | fin_buffered_(false), |
| 246 | fin_sent_(false), |
| 247 | fin_outstanding_(false), |
| 248 | fin_lost_(false), |
| 249 | fin_received_(fin_received), |
| 250 | rst_sent_(false), |
| 251 | rst_received_(false), |
| 252 | perspective_(session_->perspective()), |
| 253 | flow_controller_(std::move(flow_controller)), |
| 254 | connection_flow_controller_(connection_flow_controller), |
| 255 | stream_contributes_to_connection_flow_control_(true), |
| 256 | busy_counter_(0), |
| 257 | add_random_padding_after_fin_(false), |
| 258 | send_buffer_( |
| 259 | session->connection()->helper()->GetStreamSendBufferAllocator()), |
| 260 | buffered_data_threshold_(GetQuicFlag(FLAGS_quic_buffered_data_threshold)), |
| 261 | is_static_(is_static), |
| 262 | deadline_(QuicTime::Zero()), |
fkastenholz | 305e173 | 2019-06-18 05:01:22 -0700 | [diff] [blame] | 263 | type_(VersionHasIetfQuicFrames( |
| 264 | session->connection()->transport_version()) && |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 265 | type != CRYPTO |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 266 | ? QuicUtils::GetStreamType(id_, |
| 267 | perspective_, |
| 268 | session->IsIncomingStream(id_)) |
| 269 | : type) { |
| 270 | if (type_ == WRITE_UNIDIRECTIONAL) { |
| 271 | set_fin_received(true); |
| 272 | CloseReadSide(); |
| 273 | } else if (type_ == READ_UNIDIRECTIONAL) { |
| 274 | set_fin_sent(true); |
| 275 | CloseWriteSide(); |
| 276 | } |
| 277 | SetFromConfig(); |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 278 | if (type_ != CRYPTO) { |
| 279 | session_->RegisterStreamPriority(id, is_static_, priority_); |
| 280 | } |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 281 | } |
| 282 | |
| 283 | QuicStream::~QuicStream() { |
| 284 | if (session_ != nullptr && IsWaitingForAcks()) { |
| 285 | QUIC_DVLOG(1) |
| 286 | << ENDPOINT << "Stream " << id_ |
| 287 | << " gets destroyed while waiting for acks. stream_bytes_outstanding = " |
| 288 | << send_buffer_.stream_bytes_outstanding() |
| 289 | << ", fin_outstanding: " << fin_outstanding_; |
| 290 | } |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 291 | if (session_ != nullptr && type_ != CRYPTO) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 292 | session_->UnregisterStreamPriority(id(), is_static_); |
| 293 | } |
| 294 | } |
| 295 | |
| 296 | void QuicStream::SetFromConfig() {} |
| 297 | |
| 298 | void QuicStream::OnStreamFrame(const QuicStreamFrame& frame) { |
| 299 | DCHECK_EQ(frame.stream_id, id_); |
| 300 | |
| 301 | DCHECK(!(read_side_closed_ && write_side_closed_)); |
| 302 | |
| 303 | if (type_ == WRITE_UNIDIRECTIONAL) { |
| 304 | CloseConnectionWithDetails( |
| 305 | QUIC_DATA_RECEIVED_ON_WRITE_UNIDIRECTIONAL_STREAM, |
| 306 | "Data received on write unidirectional stream"); |
| 307 | return; |
| 308 | } |
| 309 | |
| 310 | bool is_stream_too_long = |
| 311 | (frame.offset > kMaxStreamLength) || |
| 312 | (kMaxStreamLength - frame.offset < frame.data_length); |
| 313 | if (is_stream_too_long) { |
| 314 | // Close connection if stream becomes too long. |
| 315 | QUIC_PEER_BUG << "Receive stream frame on stream " << id_ |
| 316 | << " reaches max stream length. frame offset " << frame.offset |
| 317 | << " length " << frame.data_length << ". " |
| 318 | << sequencer_.DebugString(); |
| 319 | CloseConnectionWithDetails( |
| 320 | QUIC_STREAM_LENGTH_OVERFLOW, |
| 321 | QuicStrCat("Peer sends more data than allowed on stream ", id_, |
| 322 | ". frame: offset = ", frame.offset, ", length = ", |
| 323 | frame.data_length, ". ", sequencer_.DebugString())); |
| 324 | return; |
| 325 | } |
| 326 | if (frame.fin) { |
| 327 | fin_received_ = true; |
| 328 | if (fin_sent_) { |
| 329 | session_->StreamDraining(id_); |
| 330 | } |
| 331 | } |
| 332 | |
| 333 | if (read_side_closed_) { |
| 334 | QUIC_DLOG(INFO) |
| 335 | << ENDPOINT << "Stream " << frame.stream_id |
| 336 | << " is closed for reading. Ignoring newly received stream data."; |
| 337 | // The subclass does not want to read data: blackhole the data. |
| 338 | return; |
| 339 | } |
| 340 | |
| 341 | // This count includes duplicate data received. |
| 342 | size_t frame_payload_size = frame.data_length; |
| 343 | stream_bytes_read_ += frame_payload_size; |
| 344 | |
| 345 | // Flow control is interested in tracking highest received offset. |
| 346 | // Only interested in received frames that carry data. |
| 347 | if (frame_payload_size > 0 && |
| 348 | MaybeIncreaseHighestReceivedOffset(frame.offset + frame_payload_size)) { |
| 349 | // As the highest received offset has changed, check to see if this is a |
| 350 | // violation of flow control. |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 351 | if (flow_controller_->FlowControlViolation() || |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 352 | connection_flow_controller_->FlowControlViolation()) { |
| 353 | CloseConnectionWithDetails( |
| 354 | QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, |
| 355 | "Flow control violation after increasing offset"); |
| 356 | return; |
| 357 | } |
| 358 | } |
| 359 | |
| 360 | sequencer_.OnStreamFrame(frame); |
| 361 | } |
| 362 | |
| 363 | int QuicStream::num_frames_received() const { |
| 364 | return sequencer_.num_frames_received(); |
| 365 | } |
| 366 | |
| 367 | int QuicStream::num_duplicate_frames_received() const { |
| 368 | return sequencer_.num_duplicate_frames_received(); |
| 369 | } |
| 370 | |
| 371 | void QuicStream::OnStreamReset(const QuicRstStreamFrame& frame) { |
| 372 | rst_received_ = true; |
| 373 | if (frame.byte_offset > kMaxStreamLength) { |
| 374 | // Peer are not suppose to write bytes more than maxium allowed. |
| 375 | CloseConnectionWithDetails(QUIC_STREAM_LENGTH_OVERFLOW, |
| 376 | "Reset frame stream offset overflow."); |
| 377 | return; |
| 378 | } |
| 379 | MaybeIncreaseHighestReceivedOffset(frame.byte_offset); |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 380 | if (flow_controller_->FlowControlViolation() || |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 381 | connection_flow_controller_->FlowControlViolation()) { |
| 382 | CloseConnectionWithDetails( |
| 383 | QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA, |
| 384 | "Flow control violation after increasing offset"); |
| 385 | return; |
| 386 | } |
| 387 | |
| 388 | stream_error_ = frame.error_code; |
| 389 | // Google QUIC closes both sides of the stream in response to a |
| 390 | // RESET_STREAM, IETF QUIC closes only the read side. |
fkastenholz | 305e173 | 2019-06-18 05:01:22 -0700 | [diff] [blame] | 391 | if (!VersionHasIetfQuicFrames(transport_version())) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 392 | CloseWriteSide(); |
| 393 | } |
| 394 | CloseReadSide(); |
| 395 | } |
| 396 | |
| 397 | void QuicStream::OnConnectionClosed(QuicErrorCode error, |
| 398 | ConnectionCloseSource /*source*/) { |
| 399 | if (read_side_closed_ && write_side_closed_) { |
| 400 | return; |
| 401 | } |
| 402 | if (error != QUIC_NO_ERROR) { |
| 403 | stream_error_ = QUIC_STREAM_CONNECTION_ERROR; |
| 404 | connection_error_ = error; |
| 405 | } |
| 406 | |
| 407 | CloseWriteSide(); |
| 408 | CloseReadSide(); |
| 409 | } |
| 410 | |
| 411 | void QuicStream::OnFinRead() { |
| 412 | DCHECK(sequencer_.IsClosed()); |
| 413 | // OnFinRead can be called due to a FIN flag in a headers block, so there may |
| 414 | // have been no OnStreamFrame call with a FIN in the frame. |
| 415 | fin_received_ = true; |
| 416 | // If fin_sent_ is true, then CloseWriteSide has already been called, and the |
| 417 | // stream will be destroyed by CloseReadSide, so don't need to call |
| 418 | // StreamDraining. |
| 419 | CloseReadSide(); |
| 420 | } |
| 421 | |
| 422 | void QuicStream::Reset(QuicRstStreamErrorCode error) { |
| 423 | stream_error_ = error; |
| 424 | // Sending a RstStream results in calling CloseStream. |
| 425 | session()->SendRstStream(id(), error, stream_bytes_written()); |
| 426 | rst_sent_ = true; |
| 427 | } |
| 428 | |
| 429 | void QuicStream::CloseConnectionWithDetails(QuicErrorCode error, |
vasilvv | c48c871 | 2019-03-11 13:38:16 -0700 | [diff] [blame] | 430 | const std::string& details) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 431 | session()->connection()->CloseConnection( |
| 432 | error, details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET); |
| 433 | } |
| 434 | |
| 435 | SpdyPriority QuicStream::priority() const { |
| 436 | return priority_; |
| 437 | } |
| 438 | |
| 439 | void QuicStream::SetPriority(SpdyPriority priority) { |
| 440 | priority_ = priority; |
| 441 | session_->UpdateStreamPriority(id(), priority); |
| 442 | } |
| 443 | |
| 444 | void QuicStream::WriteOrBufferData( |
| 445 | QuicStringPiece data, |
| 446 | bool fin, |
| 447 | QuicReferenceCountedPointer<QuicAckListenerInterface> ack_listener) { |
| 448 | if (data.empty() && !fin) { |
| 449 | QUIC_BUG << "data.empty() && !fin"; |
| 450 | return; |
| 451 | } |
| 452 | |
| 453 | if (fin_buffered_) { |
| 454 | QUIC_BUG << "Fin already buffered"; |
| 455 | return; |
| 456 | } |
| 457 | if (write_side_closed_) { |
| 458 | QUIC_DLOG(ERROR) << ENDPOINT |
| 459 | << "Attempt to write when the write side is closed"; |
| 460 | if (type_ == READ_UNIDIRECTIONAL) { |
| 461 | CloseConnectionWithDetails( |
| 462 | QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM, |
| 463 | "Try to send data on read unidirectional stream"); |
| 464 | } |
| 465 | return; |
| 466 | } |
| 467 | |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 468 | fin_buffered_ = fin; |
| 469 | |
| 470 | bool had_buffered_data = HasBufferedData(); |
| 471 | // Do not respect buffered data upper limit as WriteOrBufferData guarantees |
| 472 | // all data to be consumed. |
| 473 | if (data.length() > 0) { |
| 474 | struct iovec iov(QuicUtils::MakeIovec(data)); |
| 475 | QuicStreamOffset offset = send_buffer_.stream_offset(); |
| 476 | if (kMaxStreamLength - offset < data.length()) { |
| 477 | QUIC_BUG << "Write too many data via stream " << id_; |
| 478 | CloseConnectionWithDetails( |
| 479 | QUIC_STREAM_LENGTH_OVERFLOW, |
| 480 | QuicStrCat("Write too many data via stream ", id_)); |
| 481 | return; |
| 482 | } |
| 483 | send_buffer_.SaveStreamData(&iov, 1, 0, data.length()); |
| 484 | OnDataBuffered(offset, data.length(), ack_listener); |
| 485 | } |
| 486 | if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { |
| 487 | // Write data if there is no buffered data before. |
| 488 | WriteBufferedData(); |
| 489 | } |
| 490 | } |
| 491 | |
| 492 | void QuicStream::OnCanWrite() { |
| 493 | if (HasDeadlinePassed()) { |
| 494 | OnDeadlinePassed(); |
| 495 | return; |
| 496 | } |
| 497 | if (HasPendingRetransmission()) { |
| 498 | WritePendingRetransmission(); |
| 499 | // Exit early to allow other streams to write pending retransmissions if |
| 500 | // any. |
| 501 | return; |
| 502 | } |
| 503 | |
| 504 | if (write_side_closed_) { |
| 505 | QUIC_DLOG(ERROR) |
| 506 | << ENDPOINT << "Stream " << id() |
| 507 | << " attempting to write new data when the write side is closed"; |
| 508 | return; |
| 509 | } |
| 510 | if (HasBufferedData() || (fin_buffered_ && !fin_sent_)) { |
| 511 | WriteBufferedData(); |
| 512 | } |
| 513 | if (!fin_buffered_ && !fin_sent_ && CanWriteNewData()) { |
| 514 | // Notify upper layer to write new data when buffered data size is below |
| 515 | // low water mark. |
| 516 | OnCanWriteNewData(); |
| 517 | } |
| 518 | } |
| 519 | |
| 520 | void QuicStream::MaybeSendBlocked() { |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 521 | if (flow_controller_->ShouldSendBlocked()) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 522 | session_->SendBlocked(id_); |
| 523 | } |
| 524 | if (!stream_contributes_to_connection_flow_control_) { |
| 525 | return; |
| 526 | } |
| 527 | if (connection_flow_controller_->ShouldSendBlocked()) { |
| 528 | session_->SendBlocked(QuicUtils::GetInvalidStreamId(transport_version())); |
| 529 | } |
| 530 | // If the stream is blocked by connection-level flow control but not by |
| 531 | // stream-level flow control, add the stream to the write blocked list so that |
| 532 | // the stream will be given a chance to write when a connection-level |
| 533 | // WINDOW_UPDATE arrives. |
| 534 | if (connection_flow_controller_->IsBlocked() && |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 535 | !flow_controller_->IsBlocked()) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 536 | session_->MarkConnectionLevelWriteBlocked(id()); |
| 537 | } |
| 538 | } |
| 539 | |
| 540 | QuicConsumedData QuicStream::WritevData(const struct iovec* iov, |
| 541 | int iov_count, |
| 542 | bool fin) { |
| 543 | if (write_side_closed_) { |
| 544 | QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() |
| 545 | << "attempting to write when the write side is closed"; |
| 546 | if (type_ == READ_UNIDIRECTIONAL) { |
| 547 | CloseConnectionWithDetails( |
| 548 | QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM, |
| 549 | "Try to send data on read unidirectional stream"); |
| 550 | } |
| 551 | return QuicConsumedData(0, false); |
| 552 | } |
| 553 | |
| 554 | // How much data was provided. |
| 555 | size_t write_length = 0; |
| 556 | if (iov != nullptr) { |
| 557 | for (int i = 0; i < iov_count; ++i) { |
| 558 | write_length += iov[i].iov_len; |
| 559 | } |
| 560 | } |
| 561 | |
| 562 | QuicConsumedData consumed_data(0, false); |
| 563 | if (fin_buffered_) { |
| 564 | QUIC_BUG << "Fin already buffered"; |
| 565 | return consumed_data; |
| 566 | } |
| 567 | |
| 568 | if (kMaxStreamLength - send_buffer_.stream_offset() < write_length) { |
| 569 | QUIC_BUG << "Write too many data via stream " << id_; |
| 570 | CloseConnectionWithDetails( |
| 571 | QUIC_STREAM_LENGTH_OVERFLOW, |
| 572 | QuicStrCat("Write too many data via stream ", id_)); |
| 573 | return consumed_data; |
| 574 | } |
| 575 | |
| 576 | bool had_buffered_data = HasBufferedData(); |
| 577 | if (CanWriteNewData()) { |
| 578 | // Save all data if buffered data size is below low water mark. |
| 579 | consumed_data.bytes_consumed = write_length; |
| 580 | if (consumed_data.bytes_consumed > 0) { |
| 581 | QuicStreamOffset offset = send_buffer_.stream_offset(); |
| 582 | send_buffer_.SaveStreamData(iov, iov_count, 0, write_length); |
| 583 | OnDataBuffered(offset, write_length, nullptr); |
| 584 | } |
| 585 | } |
| 586 | consumed_data.fin_consumed = |
| 587 | consumed_data.bytes_consumed == write_length && fin; |
| 588 | fin_buffered_ = consumed_data.fin_consumed; |
| 589 | |
| 590 | if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { |
| 591 | // Write data if there is no buffered data before. |
| 592 | WriteBufferedData(); |
| 593 | } |
| 594 | |
| 595 | return consumed_data; |
| 596 | } |
| 597 | |
| 598 | QuicConsumedData QuicStream::WriteMemSlices(QuicMemSliceSpan span, bool fin) { |
| 599 | QuicConsumedData consumed_data(0, false); |
| 600 | if (span.empty() && !fin) { |
| 601 | QUIC_BUG << "span.empty() && !fin"; |
| 602 | return consumed_data; |
| 603 | } |
| 604 | |
| 605 | if (fin_buffered_) { |
| 606 | QUIC_BUG << "Fin already buffered"; |
| 607 | return consumed_data; |
| 608 | } |
| 609 | |
| 610 | if (write_side_closed_) { |
| 611 | QUIC_DLOG(ERROR) << ENDPOINT << "Stream " << id() |
zhongyi | 4dc99c0 | 2019-05-30 14:11:04 -0700 | [diff] [blame] | 612 | << " attempting to write when the write side is closed"; |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 613 | if (type_ == READ_UNIDIRECTIONAL) { |
| 614 | CloseConnectionWithDetails( |
| 615 | QUIC_TRY_TO_WRITE_DATA_ON_READ_UNIDIRECTIONAL_STREAM, |
| 616 | "Try to send data on read unidirectional stream"); |
| 617 | } |
| 618 | return consumed_data; |
| 619 | } |
| 620 | |
| 621 | bool had_buffered_data = HasBufferedData(); |
| 622 | if (CanWriteNewData() || span.empty()) { |
| 623 | consumed_data.fin_consumed = fin; |
| 624 | if (!span.empty()) { |
| 625 | // Buffer all data if buffered data size is below limit. |
| 626 | QuicStreamOffset offset = send_buffer_.stream_offset(); |
wub | 553a966 | 2019-03-28 20:13:23 -0700 | [diff] [blame] | 627 | consumed_data.bytes_consumed = send_buffer_.SaveMemSliceSpan(span); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 628 | if (offset > send_buffer_.stream_offset() || |
| 629 | kMaxStreamLength < send_buffer_.stream_offset()) { |
| 630 | QUIC_BUG << "Write too many data via stream " << id_; |
| 631 | CloseConnectionWithDetails( |
| 632 | QUIC_STREAM_LENGTH_OVERFLOW, |
| 633 | QuicStrCat("Write too many data via stream ", id_)); |
| 634 | return consumed_data; |
| 635 | } |
| 636 | OnDataBuffered(offset, consumed_data.bytes_consumed, nullptr); |
| 637 | } |
| 638 | } |
| 639 | fin_buffered_ = consumed_data.fin_consumed; |
| 640 | |
| 641 | if (!had_buffered_data && (HasBufferedData() || fin_buffered_)) { |
| 642 | // Write data if there is no buffered data before. |
| 643 | WriteBufferedData(); |
| 644 | } |
| 645 | |
| 646 | return consumed_data; |
| 647 | } |
| 648 | |
| 649 | bool QuicStream::HasPendingRetransmission() const { |
| 650 | return send_buffer_.HasPendingRetransmission() || fin_lost_; |
| 651 | } |
| 652 | |
| 653 | bool QuicStream::IsStreamFrameOutstanding(QuicStreamOffset offset, |
| 654 | QuicByteCount data_length, |
| 655 | bool fin) const { |
| 656 | return send_buffer_.IsStreamDataOutstanding(offset, data_length) || |
| 657 | (fin && fin_outstanding_); |
| 658 | } |
| 659 | |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 660 | void QuicStream::CloseReadSide() { |
| 661 | if (read_side_closed_) { |
| 662 | return; |
| 663 | } |
| 664 | QUIC_DVLOG(1) << ENDPOINT << "Done reading from stream " << id(); |
| 665 | |
| 666 | read_side_closed_ = true; |
| 667 | sequencer_.ReleaseBuffer(); |
| 668 | |
| 669 | if (write_side_closed_) { |
| 670 | QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id(); |
| 671 | session_->CloseStream(id()); |
| 672 | } |
| 673 | } |
| 674 | |
| 675 | void QuicStream::CloseWriteSide() { |
| 676 | if (write_side_closed_) { |
| 677 | return; |
| 678 | } |
| 679 | QUIC_DVLOG(1) << ENDPOINT << "Done writing to stream " << id(); |
| 680 | |
| 681 | write_side_closed_ = true; |
| 682 | if (read_side_closed_) { |
| 683 | QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << id(); |
| 684 | session_->CloseStream(id()); |
| 685 | } |
| 686 | } |
| 687 | |
| 688 | bool QuicStream::HasBufferedData() const { |
| 689 | DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written()); |
| 690 | return send_buffer_.stream_offset() > stream_bytes_written(); |
| 691 | } |
| 692 | |
| 693 | QuicTransportVersion QuicStream::transport_version() const { |
| 694 | return session_->connection()->transport_version(); |
| 695 | } |
| 696 | |
| 697 | HandshakeProtocol QuicStream::handshake_protocol() const { |
| 698 | return session_->connection()->version().handshake_protocol; |
| 699 | } |
| 700 | |
| 701 | void QuicStream::StopReading() { |
| 702 | QUIC_DVLOG(1) << ENDPOINT << "Stop reading from stream " << id(); |
| 703 | sequencer_.StopReading(); |
| 704 | } |
| 705 | |
| 706 | const QuicSocketAddress& QuicStream::PeerAddressOfLatestPacket() const { |
| 707 | return session_->connection()->last_packet_source_address(); |
| 708 | } |
| 709 | |
| 710 | void QuicStream::OnClose() { |
| 711 | CloseReadSide(); |
| 712 | CloseWriteSide(); |
| 713 | |
| 714 | if (!fin_sent_ && !rst_sent_) { |
| 715 | // For flow control accounting, tell the peer how many bytes have been |
| 716 | // written on this stream before termination. Done here if needed, using a |
| 717 | // RST_STREAM frame. |
| 718 | QUIC_DLOG(INFO) << ENDPOINT << "Sending RST_STREAM in OnClose: " << id(); |
| 719 | session_->SendRstStream(id(), QUIC_RST_ACKNOWLEDGEMENT, |
| 720 | stream_bytes_written()); |
| 721 | session_->OnStreamDoneWaitingForAcks(id_); |
| 722 | rst_sent_ = true; |
| 723 | } |
| 724 | |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 725 | if (flow_controller_->FlowControlViolation() || |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 726 | connection_flow_controller_->FlowControlViolation()) { |
| 727 | return; |
| 728 | } |
| 729 | // The stream is being closed and will not process any further incoming bytes. |
| 730 | // As there may be more bytes in flight, to ensure that both endpoints have |
| 731 | // the same connection level flow control state, mark all unreceived or |
| 732 | // buffered bytes as consumed. |
| 733 | QuicByteCount bytes_to_consume = |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 734 | flow_controller_->highest_received_byte_offset() - |
| 735 | flow_controller_->bytes_consumed(); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 736 | AddBytesConsumed(bytes_to_consume); |
| 737 | } |
| 738 | |
| 739 | void QuicStream::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) { |
renjietang | 28c04b7 | 2019-07-01 15:08:09 -0700 | [diff] [blame^] | 740 | if (GetQuicReloadableFlag(quic_no_window_update_on_read_only_stream) && |
| 741 | type_ == READ_UNIDIRECTIONAL) { |
| 742 | QUIC_RELOADABLE_FLAG_COUNT(quic_no_window_update_on_read_only_stream); |
| 743 | CloseConnectionWithDetails( |
| 744 | QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM, |
| 745 | "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream."); |
| 746 | return; |
| 747 | } |
| 748 | |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 749 | if (flow_controller_->UpdateSendWindowOffset(frame.byte_offset)) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 750 | // Let session unblock this stream. |
| 751 | session_->MarkConnectionLevelWriteBlocked(id_); |
| 752 | } |
| 753 | } |
| 754 | |
| 755 | bool QuicStream::MaybeIncreaseHighestReceivedOffset( |
| 756 | QuicStreamOffset new_offset) { |
| 757 | uint64_t increment = |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 758 | new_offset - flow_controller_->highest_received_byte_offset(); |
| 759 | if (!flow_controller_->UpdateHighestReceivedOffset(new_offset)) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 760 | return false; |
| 761 | } |
| 762 | |
| 763 | // If |new_offset| increased the stream flow controller's highest received |
| 764 | // offset, increase the connection flow controller's value by the incremental |
| 765 | // difference. |
| 766 | if (stream_contributes_to_connection_flow_control_) { |
| 767 | connection_flow_controller_->UpdateHighestReceivedOffset( |
| 768 | connection_flow_controller_->highest_received_byte_offset() + |
| 769 | increment); |
| 770 | } |
| 771 | return true; |
| 772 | } |
| 773 | |
| 774 | void QuicStream::AddBytesSent(QuicByteCount bytes) { |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 775 | flow_controller_->AddBytesSent(bytes); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 776 | if (stream_contributes_to_connection_flow_control_) { |
| 777 | connection_flow_controller_->AddBytesSent(bytes); |
| 778 | } |
| 779 | } |
| 780 | |
| 781 | void QuicStream::AddBytesConsumed(QuicByteCount bytes) { |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 782 | if (type_ == CRYPTO) { |
| 783 | // A stream with type CRYPTO has no flow control, so there's nothing this |
| 784 | // function needs to do. This function still gets called by the |
| 785 | // QuicStreamSequencers used by QuicCryptoStream. |
| 786 | return; |
| 787 | } |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 788 | // Only adjust stream level flow controller if still reading. |
| 789 | if (!read_side_closed_) { |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 790 | flow_controller_->AddBytesConsumed(bytes); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 791 | } |
| 792 | |
| 793 | if (stream_contributes_to_connection_flow_control_) { |
| 794 | connection_flow_controller_->AddBytesConsumed(bytes); |
| 795 | } |
| 796 | } |
| 797 | |
| 798 | void QuicStream::UpdateSendWindowOffset(QuicStreamOffset new_window) { |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 799 | if (flow_controller_->UpdateSendWindowOffset(new_window)) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 800 | // Let session unblock this stream. |
| 801 | session_->MarkConnectionLevelWriteBlocked(id_); |
| 802 | } |
| 803 | } |
| 804 | |
| 805 | void QuicStream::AddRandomPaddingAfterFin() { |
| 806 | add_random_padding_after_fin_ = true; |
| 807 | } |
| 808 | |
| 809 | bool QuicStream::OnStreamFrameAcked(QuicStreamOffset offset, |
| 810 | QuicByteCount data_length, |
| 811 | bool fin_acked, |
dschinazi | 17d4242 | 2019-06-18 16:35:07 -0700 | [diff] [blame] | 812 | QuicTime::Delta /*ack_delay_time*/, |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 813 | QuicByteCount* newly_acked_length) { |
| 814 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Acking " |
| 815 | << "[" << offset << ", " << offset + data_length << "]" |
| 816 | << " fin = " << fin_acked; |
| 817 | *newly_acked_length = 0; |
| 818 | if (!send_buffer_.OnStreamDataAcked(offset, data_length, |
| 819 | newly_acked_length)) { |
| 820 | CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, |
| 821 | "Trying to ack unsent data."); |
| 822 | return false; |
| 823 | } |
| 824 | if (!fin_sent_ && fin_acked) { |
| 825 | CloseConnectionWithDetails(QUIC_INTERNAL_ERROR, |
| 826 | "Trying to ack unsent fin."); |
| 827 | return false; |
| 828 | } |
| 829 | // Indicates whether ack listener's OnPacketAcked should be called. |
| 830 | const bool new_data_acked = |
| 831 | *newly_acked_length > 0 || (fin_acked && fin_outstanding_); |
| 832 | if (fin_acked) { |
| 833 | fin_outstanding_ = false; |
| 834 | fin_lost_ = false; |
| 835 | } |
| 836 | if (!IsWaitingForAcks()) { |
| 837 | session_->OnStreamDoneWaitingForAcks(id_); |
| 838 | } |
| 839 | return new_data_acked; |
| 840 | } |
| 841 | |
| 842 | void QuicStream::OnStreamFrameRetransmitted(QuicStreamOffset offset, |
| 843 | QuicByteCount data_length, |
| 844 | bool fin_retransmitted) { |
| 845 | send_buffer_.OnStreamDataRetransmitted(offset, data_length); |
| 846 | if (fin_retransmitted) { |
| 847 | fin_lost_ = false; |
| 848 | } |
| 849 | } |
| 850 | |
| 851 | void QuicStream::OnStreamFrameLost(QuicStreamOffset offset, |
| 852 | QuicByteCount data_length, |
| 853 | bool fin_lost) { |
| 854 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " Losting " |
| 855 | << "[" << offset << ", " << offset + data_length << "]" |
| 856 | << " fin = " << fin_lost; |
| 857 | if (data_length > 0) { |
| 858 | send_buffer_.OnStreamDataLost(offset, data_length); |
| 859 | } |
| 860 | if (fin_lost && fin_outstanding_) { |
| 861 | fin_lost_ = true; |
| 862 | } |
| 863 | } |
| 864 | |
| 865 | bool QuicStream::RetransmitStreamData(QuicStreamOffset offset, |
| 866 | QuicByteCount data_length, |
| 867 | bool fin) { |
| 868 | if (HasDeadlinePassed()) { |
| 869 | OnDeadlinePassed(); |
| 870 | return true; |
| 871 | } |
| 872 | QuicIntervalSet<QuicStreamOffset> retransmission(offset, |
| 873 | offset + data_length); |
| 874 | retransmission.Difference(bytes_acked()); |
| 875 | bool retransmit_fin = fin && fin_outstanding_; |
| 876 | if (retransmission.Empty() && !retransmit_fin) { |
| 877 | return true; |
| 878 | } |
| 879 | QuicConsumedData consumed(0, false); |
| 880 | for (const auto& interval : retransmission) { |
| 881 | QuicStreamOffset retransmission_offset = interval.min(); |
| 882 | QuicByteCount retransmission_length = interval.max() - interval.min(); |
| 883 | const bool can_bundle_fin = |
| 884 | retransmit_fin && (retransmission_offset + retransmission_length == |
| 885 | stream_bytes_written()); |
| 886 | consumed = session()->WritevData(this, id_, retransmission_length, |
| 887 | retransmission_offset, |
| 888 | can_bundle_fin ? FIN : NO_FIN); |
| 889 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ |
| 890 | << " is forced to retransmit stream data [" |
| 891 | << retransmission_offset << ", " |
| 892 | << retransmission_offset + retransmission_length |
| 893 | << ") and fin: " << can_bundle_fin |
| 894 | << ", consumed: " << consumed; |
| 895 | OnStreamFrameRetransmitted(retransmission_offset, consumed.bytes_consumed, |
| 896 | consumed.fin_consumed); |
| 897 | if (can_bundle_fin) { |
| 898 | retransmit_fin = !consumed.fin_consumed; |
| 899 | } |
| 900 | if (consumed.bytes_consumed < retransmission_length || |
| 901 | (can_bundle_fin && !consumed.fin_consumed)) { |
| 902 | // Connection is write blocked. |
| 903 | return false; |
| 904 | } |
| 905 | } |
| 906 | if (retransmit_fin) { |
| 907 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ |
| 908 | << " retransmits fin only frame."; |
| 909 | consumed = session()->WritevData(this, id_, 0, stream_bytes_written(), FIN); |
| 910 | if (!consumed.fin_consumed) { |
| 911 | return false; |
| 912 | } |
| 913 | } |
| 914 | return true; |
| 915 | } |
| 916 | |
| 917 | bool QuicStream::IsWaitingForAcks() const { |
| 918 | return (!rst_sent_ || stream_error_ == QUIC_STREAM_NO_ERROR) && |
| 919 | (send_buffer_.stream_bytes_outstanding() || fin_outstanding_); |
| 920 | } |
| 921 | |
| 922 | size_t QuicStream::ReadableBytes() const { |
| 923 | return sequencer_.ReadableBytes(); |
| 924 | } |
| 925 | |
| 926 | bool QuicStream::WriteStreamData(QuicStreamOffset offset, |
| 927 | QuicByteCount data_length, |
| 928 | QuicDataWriter* writer) { |
| 929 | DCHECK_LT(0u, data_length); |
| 930 | QUIC_DVLOG(2) << ENDPOINT << "Write stream " << id_ << " data from offset " |
| 931 | << offset << " length " << data_length; |
| 932 | return send_buffer_.WriteStreamData(offset, data_length, writer); |
| 933 | } |
| 934 | |
| 935 | void QuicStream::WriteBufferedData() { |
| 936 | DCHECK(!write_side_closed_ && (HasBufferedData() || fin_buffered_)); |
| 937 | |
| 938 | if (session_->ShouldYield(id())) { |
| 939 | session_->MarkConnectionLevelWriteBlocked(id()); |
| 940 | return; |
| 941 | } |
| 942 | |
| 943 | // Size of buffered data. |
| 944 | size_t write_length = BufferedDataBytes(); |
| 945 | |
| 946 | // A FIN with zero data payload should not be flow control blocked. |
| 947 | bool fin_with_zero_data = (fin_buffered_ && write_length == 0); |
| 948 | |
| 949 | bool fin = fin_buffered_; |
| 950 | |
| 951 | // How much data flow control permits to be written. |
nharper | d5c4a93 | 2019-05-13 13:58:49 -0700 | [diff] [blame] | 952 | QuicByteCount send_window = flow_controller_->SendWindowSize(); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 953 | if (stream_contributes_to_connection_flow_control_) { |
| 954 | send_window = |
| 955 | std::min(send_window, connection_flow_controller_->SendWindowSize()); |
| 956 | } |
| 957 | |
| 958 | if (send_window == 0 && !fin_with_zero_data) { |
| 959 | // Quick return if nothing can be sent. |
| 960 | MaybeSendBlocked(); |
| 961 | return; |
| 962 | } |
| 963 | |
| 964 | if (write_length > send_window) { |
| 965 | // Don't send the FIN unless all the data will be sent. |
| 966 | fin = false; |
| 967 | |
| 968 | // Writing more data would be a violation of flow control. |
| 969 | write_length = static_cast<size_t>(send_window); |
| 970 | QUIC_DVLOG(1) << "stream " << id() << " shortens write length to " |
| 971 | << write_length << " due to flow control"; |
| 972 | } |
| 973 | if (session_->session_decides_what_to_write()) { |
| 974 | session_->SetTransmissionType(NOT_RETRANSMISSION); |
| 975 | } |
fayang | 9785735 | 2019-07-01 06:15:26 -0700 | [diff] [blame] | 976 | |
| 977 | StreamSendingState state = fin ? FIN : NO_FIN; |
| 978 | if (fin && add_random_padding_after_fin_) { |
| 979 | state = FIN_AND_PADDING; |
| 980 | } |
| 981 | QuicConsumedData consumed_data = session_->WritevData( |
| 982 | this, id(), write_length, stream_bytes_written(), state); |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 983 | |
| 984 | OnStreamDataConsumed(consumed_data.bytes_consumed); |
| 985 | |
| 986 | AddBytesSent(consumed_data.bytes_consumed); |
| 987 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ << " sends " |
| 988 | << stream_bytes_written() << " bytes " |
| 989 | << " and has buffered data " << BufferedDataBytes() << " bytes." |
| 990 | << " fin is sent: " << consumed_data.fin_consumed |
| 991 | << " fin is buffered: " << fin_buffered_; |
| 992 | |
| 993 | // The write may have generated a write error causing this stream to be |
| 994 | // closed. If so, simply return without marking the stream write blocked. |
| 995 | if (write_side_closed_) { |
| 996 | return; |
| 997 | } |
| 998 | |
| 999 | if (consumed_data.bytes_consumed == write_length) { |
| 1000 | if (!fin_with_zero_data) { |
| 1001 | MaybeSendBlocked(); |
| 1002 | } |
| 1003 | if (fin && consumed_data.fin_consumed) { |
| 1004 | fin_sent_ = true; |
| 1005 | fin_outstanding_ = true; |
| 1006 | if (fin_received_) { |
| 1007 | session_->StreamDraining(id_); |
| 1008 | } |
| 1009 | CloseWriteSide(); |
| 1010 | } else if (fin && !consumed_data.fin_consumed) { |
| 1011 | session_->MarkConnectionLevelWriteBlocked(id()); |
| 1012 | } |
| 1013 | } else { |
| 1014 | session_->MarkConnectionLevelWriteBlocked(id()); |
| 1015 | } |
| 1016 | if (consumed_data.bytes_consumed > 0 || consumed_data.fin_consumed) { |
| 1017 | busy_counter_ = 0; |
| 1018 | } |
zhongyi | 1b2f783 | 2019-06-14 13:31:34 -0700 | [diff] [blame] | 1019 | |
| 1020 | if (IsWaitingForAcks()) { |
| 1021 | session_->OnStreamWaitingForAcks(id_); |
| 1022 | } |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 1023 | } |
| 1024 | |
| 1025 | uint64_t QuicStream::BufferedDataBytes() const { |
| 1026 | DCHECK_GE(send_buffer_.stream_offset(), stream_bytes_written()); |
| 1027 | return send_buffer_.stream_offset() - stream_bytes_written(); |
| 1028 | } |
| 1029 | |
| 1030 | bool QuicStream::CanWriteNewData() const { |
| 1031 | return BufferedDataBytes() < buffered_data_threshold_; |
| 1032 | } |
| 1033 | |
| 1034 | bool QuicStream::CanWriteNewDataAfterData(QuicByteCount length) const { |
| 1035 | return (BufferedDataBytes() + length) < buffered_data_threshold_; |
| 1036 | } |
| 1037 | |
| 1038 | uint64_t QuicStream::stream_bytes_written() const { |
| 1039 | return send_buffer_.stream_bytes_written(); |
| 1040 | } |
| 1041 | |
| 1042 | const QuicIntervalSet<QuicStreamOffset>& QuicStream::bytes_acked() const { |
| 1043 | return send_buffer_.bytes_acked(); |
| 1044 | } |
| 1045 | |
| 1046 | void QuicStream::OnStreamDataConsumed(size_t bytes_consumed) { |
| 1047 | send_buffer_.OnStreamDataConsumed(bytes_consumed); |
| 1048 | } |
| 1049 | |
| 1050 | void QuicStream::WritePendingRetransmission() { |
| 1051 | while (HasPendingRetransmission()) { |
| 1052 | QuicConsumedData consumed(0, false); |
| 1053 | if (!send_buffer_.HasPendingRetransmission()) { |
| 1054 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ |
| 1055 | << " retransmits fin only frame."; |
| 1056 | consumed = |
| 1057 | session()->WritevData(this, id_, 0, stream_bytes_written(), FIN); |
| 1058 | fin_lost_ = !consumed.fin_consumed; |
| 1059 | if (fin_lost_) { |
| 1060 | // Connection is write blocked. |
| 1061 | return; |
| 1062 | } |
| 1063 | } else { |
| 1064 | StreamPendingRetransmission pending = |
| 1065 | send_buffer_.NextPendingRetransmission(); |
| 1066 | // Determine whether the lost fin can be bundled with the data. |
| 1067 | const bool can_bundle_fin = |
| 1068 | fin_lost_ && |
| 1069 | (pending.offset + pending.length == stream_bytes_written()); |
| 1070 | consumed = |
| 1071 | session()->WritevData(this, id_, pending.length, pending.offset, |
| 1072 | can_bundle_fin ? FIN : NO_FIN); |
| 1073 | QUIC_DVLOG(1) << ENDPOINT << "stream " << id_ |
| 1074 | << " tries to retransmit stream data [" << pending.offset |
| 1075 | << ", " << pending.offset + pending.length |
| 1076 | << ") and fin: " << can_bundle_fin |
| 1077 | << ", consumed: " << consumed; |
| 1078 | OnStreamFrameRetransmitted(pending.offset, consumed.bytes_consumed, |
| 1079 | consumed.fin_consumed); |
| 1080 | if (consumed.bytes_consumed < pending.length || |
| 1081 | (can_bundle_fin && !consumed.fin_consumed)) { |
| 1082 | // Connection is write blocked. |
| 1083 | return; |
| 1084 | } |
| 1085 | } |
| 1086 | } |
| 1087 | } |
| 1088 | |
| 1089 | bool QuicStream::MaybeSetTtl(QuicTime::Delta ttl) { |
| 1090 | if (is_static_) { |
| 1091 | QUIC_BUG << "Cannot set TTL of a static stream."; |
| 1092 | return false; |
| 1093 | } |
| 1094 | if (deadline_.IsInitialized()) { |
| 1095 | QUIC_DLOG(WARNING) << "Deadline has already been set."; |
| 1096 | return false; |
| 1097 | } |
| 1098 | if (!session()->session_decides_what_to_write()) { |
| 1099 | QUIC_DLOG(WARNING) << "This session does not support stream TTL yet."; |
| 1100 | return false; |
| 1101 | } |
| 1102 | QuicTime now = session()->connection()->clock()->ApproximateNow(); |
| 1103 | deadline_ = now + ttl; |
| 1104 | return true; |
| 1105 | } |
| 1106 | |
| 1107 | bool QuicStream::HasDeadlinePassed() const { |
| 1108 | if (!deadline_.IsInitialized()) { |
| 1109 | // No deadline has been set. |
| 1110 | return false; |
| 1111 | } |
| 1112 | DCHECK(session()->session_decides_what_to_write()); |
| 1113 | QuicTime now = session()->connection()->clock()->ApproximateNow(); |
| 1114 | if (now < deadline_) { |
| 1115 | return false; |
| 1116 | } |
| 1117 | // TTL expired. |
| 1118 | QUIC_DVLOG(1) << "stream " << id() << " deadline has passed"; |
| 1119 | return true; |
| 1120 | } |
| 1121 | |
| 1122 | void QuicStream::OnDeadlinePassed() { |
| 1123 | Reset(QUIC_STREAM_TTL_EXPIRED); |
| 1124 | } |
| 1125 | |
| 1126 | void QuicStream::SendStopSending(uint16_t code) { |
fkastenholz | 305e173 | 2019-06-18 05:01:22 -0700 | [diff] [blame] | 1127 | if (!VersionHasIetfQuicFrames(transport_version())) { |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 1128 | // If the connection is not version 99, do nothing. |
| 1129 | // Do not QUIC_BUG or anything; the application really does not need to know |
| 1130 | // what version the connection is in. |
| 1131 | return; |
| 1132 | } |
| 1133 | session_->SendStopSending(code, id_); |
| 1134 | } |
| 1135 | |
dschinazi | 17d4242 | 2019-06-18 16:35:07 -0700 | [diff] [blame] | 1136 | void QuicStream::OnStopSending(uint16_t /*code*/) {} |
QUICHE team | a6ef0a6 | 2019-03-07 20:34:33 -0500 | [diff] [blame] | 1137 | |
| 1138 | } // namespace quic |