blob: 01d6feb3138840d03e98ca0d5c03864189c5c3e3 [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_session.h"
6
7#include <cstdint>
vasilvv872e7a32019-03-12 16:42:44 -07008#include <string>
QUICHE teama6ef0a62019-03-07 20:34:33 -05009#include <utility>
10
11#include "net/third_party/quiche/src/quic/core/quic_connection.h"
12#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
13#include "net/third_party/quiche/src/quic/core/quic_utils.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
18#include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
19#include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
20#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
QUICHE teama6ef0a62019-03-07 20:34:33 -050021
22using spdy::SpdyPriority;
23
24namespace quic {
25
26namespace {
27
28class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
29 public:
30 explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
31 : session_(session) {}
32 ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
33 ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
34 delete;
35
36 void OnAlarm() override { session_->CleanUpClosedStreams(); }
37
38 private:
39 QuicSession* session_;
40};
41
42} // namespace
43
44#define ENDPOINT \
45 (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
46
47QuicSession::QuicSession(QuicConnection* connection,
48 Visitor* owner,
49 const QuicConfig& config,
50 const ParsedQuicVersionVector& supported_versions)
51 : connection_(connection),
52 visitor_(owner),
53 write_blocked_streams_(),
54 config_(config),
55 stream_id_manager_(this,
56 kDefaultMaxStreamsPerConnection,
57 config_.GetMaxIncomingDynamicStreamsToSend()),
58 v99_streamid_manager_(this,
59 kDefaultMaxStreamsPerConnection,
60 config_.GetMaxIncomingDynamicStreamsToSend()),
61 num_dynamic_incoming_streams_(0),
62 num_draining_incoming_streams_(0),
63 num_locally_closed_incoming_streams_highest_offset_(0),
64 error_(QUIC_NO_ERROR),
65 flow_controller_(
66 this,
67 QuicUtils::GetInvalidStreamId(connection->transport_version()),
68 /*is_connection_flow_controller*/ true,
69 kMinimumFlowControlSendWindow,
70 config_.GetInitialSessionFlowControlWindowToSend(),
71 kSessionReceiveWindowLimit,
72 perspective() == Perspective::IS_SERVER,
73 nullptr),
74 currently_writing_stream_id_(0),
75 largest_static_stream_id_(0),
76 is_handshake_confirmed_(false),
77 goaway_sent_(false),
78 goaway_received_(false),
79 control_frame_manager_(this),
80 last_message_id_(0),
81 closed_streams_clean_up_alarm_(nullptr),
82 supported_versions_(supported_versions) {
83 closed_streams_clean_up_alarm_ =
84 QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
85 new ClosedStreamsCleanUpDelegate(this)));
86}
87
88void QuicSession::Initialize() {
89 connection_->set_visitor(this);
90 connection_->SetSessionNotifier(this);
91 connection_->SetDataProducer(this);
92 connection_->SetFromConfig(config_);
93
94 DCHECK_EQ(QuicUtils::GetCryptoStreamId(connection_->transport_version()),
95 GetMutableCryptoStream()->id());
96 RegisterStaticStream(
97 QuicUtils::GetCryptoStreamId(connection_->transport_version()),
98 GetMutableCryptoStream());
99}
100
101QuicSession::~QuicSession() {
102 QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
103}
104
105void QuicSession::RegisterStaticStream(QuicStreamId id, QuicStream* stream) {
106 static_stream_map_[id] = stream;
107
108 QUIC_BUG_IF(id >
109 largest_static_stream_id_ +
110 QuicUtils::StreamIdDelta(connection_->transport_version()))
111 << ENDPOINT << "Static stream registered out of order: " << id
112 << " vs: " << largest_static_stream_id_;
113 largest_static_stream_id_ = std::max(id, largest_static_stream_id_);
114
115 if (connection_->transport_version() == QUIC_VERSION_99) {
116 v99_streamid_manager_.RegisterStaticStream(id);
117 }
118}
119
120void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
121 // TODO(rch) deal with the error case of stream id 0.
122 QuicStreamId stream_id = frame.stream_id;
123 if (stream_id ==
124 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
125 connection()->CloseConnection(
126 QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
127 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
128 return;
129 }
130
131 if (frame.fin && QuicContainsKey(static_stream_map_, stream_id)) {
132 connection()->CloseConnection(
133 QUIC_INVALID_STREAM_ID, "Attempt to close a static stream",
134 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
135 return;
136 }
137
138 StreamHandler handler = GetOrCreateStreamImpl(stream_id, frame.offset != 0);
139 if (handler.is_pending) {
140 handler.pending->OnStreamFrame(frame);
141 return;
142 }
143
144 if (!handler.stream) {
145 // The stream no longer exists, but we may still be interested in the
146 // final stream byte offset sent by the peer. A frame with a FIN can give
147 // us this offset.
148 if (frame.fin) {
149 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
150 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
151 }
152 return;
153 }
154 handler.stream->OnStreamFrame(frame);
155}
156
157void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
158 GetMutableCryptoStream()->OnCryptoFrame(frame);
159}
160
161bool QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
162 // We are not version 99. In theory, if not in version 99 then the framer
163 // could not call OnStopSending... This is just a check that is good when
164 // both a new protocol and a new implementation of that protocol are both
165 // being developed.
166 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
167
168 QuicStreamId stream_id = frame.stream_id;
169 // If Stream ID is invalid then close the connection.
170 if (stream_id ==
171 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
172 QUIC_DVLOG(1) << ENDPOINT
173 << "Received STOP_SENDING with invalid stream_id: "
174 << stream_id << " Closing connection";
175 connection()->CloseConnection(
176 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
177 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
178 return false;
179 }
180
181 // Ignore STOP_SENDING for static streams.
182 // TODO(fkastenholz): IETF Quic does not have static streams and does not
183 // make exceptions for them with respect to processing things like
184 // STOP_SENDING.
185 if (QuicContainsKey(static_stream_map_, stream_id)) {
186 QUIC_DVLOG(1) << ENDPOINT
187 << "Received STOP_SENDING for a static stream, id: "
188 << stream_id << " Closing connection";
189 connection()->CloseConnection(
190 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream",
191 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
192 return false;
193 }
194
195 if (visitor_) {
196 visitor_->OnStopSendingReceived(frame);
197 }
198
199 // If stream is closed, ignore the frame
200 if (IsClosedStream(stream_id)) {
201 QUIC_DVLOG(1)
202 << ENDPOINT
203 << "Received STOP_SENDING for closed or non-existent stream, id: "
204 << stream_id << " Ignoring.";
205 return true; // Continue processing the packet.
206 }
207 // If stream is non-existent, close the connection
208 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
209 if (it == dynamic_stream_map_.end()) {
210 QUIC_DVLOG(1) << ENDPOINT
211 << "Received STOP_SENDING for non-existent stream, id: "
212 << stream_id << " Closing connection";
213 connection()->CloseConnection(
214 IETF_QUIC_PROTOCOL_VIOLATION,
215 "Received STOP_SENDING for a non-existent stream",
216 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
217 return false;
218 }
219
220 // Get the QuicStream for this stream. Ignore the STOP_SENDING
221 // if the QuicStream pointer is NULL
222 // QUESTION: IS THIS THE RIGHT THING TO DO? (that is, this would happen IFF
223 // there was an entry in the map, but the pointer is null. sounds more like a
224 // deep programming error rather than a simple protocol problem).
225 QuicStream* stream = it->second.get();
226 if (stream == nullptr) {
227 QUIC_DVLOG(1) << ENDPOINT
228 << "Received STOP_SENDING for NULL QuicStream, stream_id: "
229 << stream_id << ". Ignoring.";
230 return true;
231 }
232 stream->OnStopSending(frame.application_error_code);
233
234 stream->set_stream_error(
235 static_cast<QuicRstStreamErrorCode>(frame.application_error_code));
236 SendRstStreamInner(
237 stream->id(),
238 static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
239 stream->stream_bytes_written(),
240 /*close_write_side_only=*/true);
241
242 return true;
243}
244
245void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
246 QuicStreamId stream_id = frame.stream_id;
247 if (stream_id ==
248 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
249 connection()->CloseConnection(
250 QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
251 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
252 return;
253 }
254
255 if (QuicContainsKey(static_stream_map_, stream_id)) {
256 connection()->CloseConnection(
257 QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
258 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
259 return;
260 }
261
262 if (visitor_) {
263 visitor_->OnRstStreamReceived(frame);
264 }
265
266 // may_buffer is true here to allow subclasses to buffer streams until the
267 // first byte of payload arrives which would allow sessions to delay
268 // creation of the stream until the type is known.
269 StreamHandler handler = GetOrCreateStreamImpl(stream_id, /*may_buffer=*/true);
270 if (handler.is_pending) {
271 handler.pending->OnRstStreamFrame(frame);
272 ClosePendingStream(stream_id);
273 return;
274 }
275 if (!handler.stream) {
276 HandleRstOnValidNonexistentStream(frame);
277 return; // Errors are handled by GetOrCreateStream.
278 }
279 handler.stream->OnStreamReset(frame);
280}
281
282void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
283 goaway_received_ = true;
284}
285
286void QuicSession::OnMessageReceived(QuicStringPiece message) {
287 QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
288 << ", " << message;
289}
290
291void QuicSession::OnConnectionClosed(QuicErrorCode error,
vasilvvc48c8712019-03-11 13:38:16 -0700292 const std::string& error_details,
QUICHE teama6ef0a62019-03-07 20:34:33 -0500293 ConnectionCloseSource source) {
294 DCHECK(!connection_->connected());
295 if (error_ == QUIC_NO_ERROR) {
296 error_ = error;
297 }
298
299 while (!dynamic_stream_map_.empty()) {
300 DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
301 QuicStreamId id = it->first;
302 it->second->OnConnectionClosed(error, source);
303 // The stream should call CloseStream as part of OnConnectionClosed.
304 if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
305 QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
306 CloseStream(id);
307 }
308 }
309
310 // Cleanup zombie stream map on connection close.
311 while (!zombie_streams_.empty()) {
312 ZombieStreamMap::iterator it = zombie_streams_.begin();
313 closed_streams_.push_back(std::move(it->second));
314 zombie_streams_.erase(it);
315 }
316
317 closed_streams_clean_up_alarm_->Cancel();
318
319 if (visitor_) {
320 visitor_->OnConnectionClosed(connection_->connection_id(), error,
321 error_details, source);
322 }
323}
324
325void QuicSession::OnWriteBlocked() {
QUICHE teamaa1d6a82019-03-13 09:14:13 -0700326 if (!connection_->connected()) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500327 return;
328 }
329 if (visitor_) {
330 visitor_->OnWriteBlocked(connection_);
331 }
332}
333
334void QuicSession::OnSuccessfulVersionNegotiation(
335 const ParsedQuicVersion& version) {
336 GetMutableCryptoStream()->OnSuccessfulVersionNegotiation(version);
337}
338
339void QuicSession::OnConnectivityProbeReceived(
340 const QuicSocketAddress& self_address,
341 const QuicSocketAddress& peer_address) {
342 if (perspective() == Perspective::IS_SERVER) {
343 // Server only sends back a connectivity probe after received a
344 // connectivity probe from a new peer address.
345 connection_->SendConnectivityProbingResponsePacket(peer_address);
346 }
347}
348
349void QuicSession::OnPathDegrading() {}
350
351bool QuicSession::AllowSelfAddressChange() const {
352 return false;
353}
354
355void QuicSession::OnForwardProgressConfirmed() {}
356
357void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
358 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
359 // assume that it still exists.
360 QuicStreamId stream_id = frame.stream_id;
361 if (stream_id ==
362 QuicUtils::GetInvalidStreamId(connection_->transport_version())) {
363 // This is a window update that applies to the connection, rather than an
364 // individual stream.
365 QUIC_DLOG(INFO) << ENDPOINT
366 << "Received connection level flow control window "
367 "update with byte offset: "
368 << frame.byte_offset;
369 flow_controller_.UpdateSendWindowOffset(frame.byte_offset);
370 return;
371 }
372 QuicStream* stream = GetOrCreateStream(stream_id);
373 if (stream != nullptr) {
374 stream->OnWindowUpdateFrame(frame);
375 }
376}
377
378void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
379 // TODO(rjshade): Compare our flow control receive windows for specified
380 // streams: if we have a large window then maybe something
381 // had gone wrong with the flow control accounting.
382 QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
383 << frame.stream_id;
384}
385
386bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
387 uint64_t previous_bytes_written,
388 bool previous_fin_sent) {
389 if ( // Stream should not be closed.
390 !stream->write_side_closed() &&
391 // Not connection flow control blocked.
392 !flow_controller_.IsBlocked() &&
393 // Detect lack of forward progress.
394 previous_bytes_written == stream->stream_bytes_written() &&
395 previous_fin_sent == stream->fin_sent()) {
396 stream->set_busy_counter(stream->busy_counter() + 1);
397 QUIC_DVLOG(1) << "Suspected busy loop on stream id " << stream->id()
398 << " stream_bytes_written " << stream->stream_bytes_written()
399 << " fin " << stream->fin_sent() << " count "
400 << stream->busy_counter();
401 // Wait a few iterations before firing, the exact count is
402 // arbitrary, more than a few to cover a few test-only false
403 // positives.
404 if (stream->busy_counter() > 20) {
405 QUIC_LOG(ERROR) << "Detected busy loop on stream id " << stream->id()
406 << " stream_bytes_written "
407 << stream->stream_bytes_written() << " fin "
408 << stream->fin_sent();
409 return false;
410 }
411 } else {
412 stream->set_busy_counter(0);
413 }
414 return true;
415}
416
417bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
418 if (!stream->write_side_closed() && stream->HasBufferedData() &&
419 !stream->flow_controller()->IsBlocked() &&
420 !write_blocked_streams_.IsStreamBlocked(stream->id())) {
421 QUIC_DLOG(ERROR) << "stream " << stream->id() << " has buffered "
422 << stream->BufferedDataBytes()
423 << " bytes, and is not flow control blocked, "
424 "but it is not in the write block list.";
425 return false;
426 }
427 return true;
428}
429
430void QuicSession::OnCanWrite() {
431 if (!RetransmitLostData()) {
432 // Cannot finish retransmitting lost data, connection is write blocked.
433 QUIC_DVLOG(1) << ENDPOINT
434 << "Cannot finish retransmitting lost data, connection is "
435 "write blocked.";
436 return;
437 }
438 if (session_decides_what_to_write()) {
439 SetTransmissionType(NOT_RETRANSMISSION);
440 }
441 // We limit the number of writes to the number of pending streams. If more
442 // streams become pending, WillingAndAbleToWrite will be true, which will
443 // cause the connection to request resumption before yielding to other
444 // connections.
445 // If we are connection level flow control blocked, then only allow the
446 // crypto and headers streams to try writing as all other streams will be
447 // blocked.
448 size_t num_writes = flow_controller_.IsBlocked()
449 ? write_blocked_streams_.NumBlockedSpecialStreams()
450 : write_blocked_streams_.NumBlockedStreams();
451 if (num_writes == 0 && !control_frame_manager_.WillingToWrite()) {
452 return;
453 }
454
455 QuicConnection::ScopedPacketFlusher flusher(
456 connection_, QuicConnection::SEND_ACK_IF_QUEUED);
457 if (control_frame_manager_.WillingToWrite()) {
458 control_frame_manager_.OnCanWrite();
459 }
460 for (size_t i = 0; i < num_writes; ++i) {
461 if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
462 write_blocked_streams_.HasWriteBlockedDataStreams())) {
463 // Writing one stream removed another!? Something's broken.
464 QUIC_BUG << "WriteBlockedStream is missing";
465 connection_->CloseConnection(QUIC_INTERNAL_ERROR,
466 "WriteBlockedStream is missing",
467 ConnectionCloseBehavior::SILENT_CLOSE);
468 return;
469 }
470 if (!connection_->CanWriteStreamData()) {
471 return;
472 }
473 currently_writing_stream_id_ = write_blocked_streams_.PopFront();
474 QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
475 if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
476 // If the stream can't write all bytes it'll re-add itself to the blocked
477 // list.
478 uint64_t previous_bytes_written = stream->stream_bytes_written();
479 bool previous_fin_sent = stream->fin_sent();
480 QUIC_DVLOG(1) << "stream " << stream->id() << " bytes_written "
481 << previous_bytes_written << " fin " << previous_fin_sent;
482 stream->OnCanWrite();
483 DCHECK(CheckStreamWriteBlocked(stream));
484 DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
485 previous_fin_sent));
486 }
487 currently_writing_stream_id_ = 0;
488 }
489}
490
491bool QuicSession::WillingAndAbleToWrite() const {
492 // Schedule a write when:
493 // 1) control frame manager has pending or new control frames, or
494 // 2) any stream has pending retransmissions, or
495 // 3) If the crypto or headers streams are blocked, or
496 // 4) connection is not flow control blocked and there are write blocked
497 // streams.
498 return control_frame_manager_.WillingToWrite() ||
499 !streams_with_pending_retransmission_.empty() ||
500 write_blocked_streams_.HasWriteBlockedSpecialStream() ||
501 (!flow_controller_.IsBlocked() &&
502 write_blocked_streams_.HasWriteBlockedDataStreams());
503}
504
505bool QuicSession::HasPendingHandshake() const {
506 return QuicContainsKey(
507 streams_with_pending_retransmission_,
508 QuicUtils::GetCryptoStreamId(connection_->transport_version())) ||
509 write_blocked_streams_.IsStreamBlocked(
510 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
511}
512
513uint64_t QuicSession::GetNumOpenDynamicStreams() const {
514 return dynamic_stream_map_.size() - draining_streams_.size() +
515 locally_closed_streams_highest_offset_.size();
516}
517
518void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
519 const QuicSocketAddress& peer_address,
520 const QuicReceivedPacket& packet) {
521 connection_->ProcessUdpPacket(self_address, peer_address, packet);
522}
523
524QuicConsumedData QuicSession::WritevData(QuicStream* stream,
525 QuicStreamId id,
526 size_t write_length,
527 QuicStreamOffset offset,
528 StreamSendingState state) {
529 // This check is an attempt to deal with potential memory corruption
530 // in which |id| ends up set to 1 (the crypto stream id). If this happen
531 // it might end up resulting in unencrypted stream data being sent.
532 // While this is impossible to avoid given sufficient corruption, this
533 // seems like a reasonable mitigation.
534 if (id == QuicUtils::GetCryptoStreamId(connection_->transport_version()) &&
535 stream != GetMutableCryptoStream()) {
536 QUIC_BUG << "Stream id mismatch";
537 connection_->CloseConnection(
538 QUIC_INTERNAL_ERROR,
539 "Non-crypto stream attempted to write data as crypto stream.",
540 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
541 return QuicConsumedData(0, false);
542 }
543 if (!IsEncryptionEstablished() &&
544 id != QuicUtils::GetCryptoStreamId(connection_->transport_version())) {
545 // Do not let streams write without encryption. The calling stream will end
546 // up write blocked until OnCanWrite is next called.
547 return QuicConsumedData(0, false);
548 }
549
550 QuicConsumedData data =
551 connection_->SendStreamData(id, write_length, offset, state);
552 if (offset >= stream->stream_bytes_written()) {
553 // This is new stream data.
554 write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
555 }
556 return data;
557}
558
559bool QuicSession::WriteControlFrame(const QuicFrame& frame) {
560 return connection_->SendControlFrame(frame);
561}
562
563void QuicSession::SendRstStream(QuicStreamId id,
564 QuicRstStreamErrorCode error,
565 QuicStreamOffset bytes_written) {
566 SendRstStreamInner(id, error, bytes_written, /*close_write_side_only=*/false);
567}
568
569void QuicSession::SendRstStreamInner(QuicStreamId id,
570 QuicRstStreamErrorCode error,
571 QuicStreamOffset bytes_written,
572 bool close_write_side_only) {
573 if (connection()->connected()) {
574 // Only send if still connected.
575 if (close_write_side_only) {
576 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
577 // Send a RST_STREAM frame.
578 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
579 } else {
580 // Send a RST_STREAM frame plus, if version 99, an IETF
581 // QUIC STOP_SENDING frame. Both sre sent to emulate
582 // the two-way close that Google QUIC's RST_STREAM does.
583 if (connection_->transport_version() == QUIC_VERSION_99) {
584 QuicConnection::ScopedPacketFlusher flusher(
585 connection(), QuicConnection::SEND_ACK_IF_QUEUED);
586 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
587 control_frame_manager_.WriteOrBufferStopSending(error, id);
588 } else {
589 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
590 }
591 }
592 connection_->OnStreamReset(id, error);
593 }
594 if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
595 OnStreamDoneWaitingForAcks(id);
596 return;
597 }
598
599 if (!close_write_side_only) {
600 CloseStreamInner(id, true);
601 return;
602 }
603 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
604
605 DynamicStreamMap::iterator it = dynamic_stream_map_.find(id);
606 if (it != dynamic_stream_map_.end()) {
607 QuicStream* stream = it->second.get();
608 if (stream) {
609 stream->set_rst_sent(true);
610 stream->CloseWriteSide();
611 }
612 }
613}
614
615void QuicSession::SendGoAway(QuicErrorCode error_code,
vasilvvc48c8712019-03-11 13:38:16 -0700616 const std::string& reason) {
QUICHE teama6ef0a62019-03-07 20:34:33 -0500617 // GOAWAY frame is not supported in v99.
618 DCHECK_NE(QUIC_VERSION_99, connection_->transport_version());
619 if (goaway_sent_) {
620 return;
621 }
622 goaway_sent_ = true;
623 control_frame_manager_.WriteOrBufferGoAway(
624 error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
625}
626
627void QuicSession::SendBlocked(QuicStreamId id) {
628 control_frame_manager_.WriteOrBufferBlocked(id);
629}
630
631void QuicSession::SendWindowUpdate(QuicStreamId id,
632 QuicStreamOffset byte_offset) {
633 control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
634}
635
636void QuicSession::SendMaxStreamId(QuicStreamId max_allowed_incoming_id) {
637 control_frame_manager_.WriteOrBufferMaxStreamId(max_allowed_incoming_id);
638}
639
640void QuicSession::SendStreamIdBlocked(QuicStreamId max_allowed_outgoing_id) {
641 control_frame_manager_.WriteOrBufferStreamIdBlocked(max_allowed_outgoing_id);
642}
643
644void QuicSession::CloseStream(QuicStreamId stream_id) {
645 CloseStreamInner(stream_id, false);
646}
647
648void QuicSession::InsertLocallyClosedStreamsHighestOffset(
649 const QuicStreamId id,
650 QuicStreamOffset offset) {
651 locally_closed_streams_highest_offset_[id] = offset;
652 if (IsIncomingStream(id)) {
653 ++num_locally_closed_incoming_streams_highest_offset_;
654 }
655}
656
657void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool locally_reset) {
658 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
659
660 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
661 if (it == dynamic_stream_map_.end()) {
662 // When CloseStreamInner has been called recursively (via
663 // QuicStream::OnClose), the stream will already have been deleted
664 // from stream_map_, so return immediately.
665 QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
666 return;
667 }
668 QuicStream* stream = it->second.get();
669
670 // Tell the stream that a RST has been sent.
671 if (locally_reset) {
672 stream->set_rst_sent(true);
673 }
674
675 if (stream->IsWaitingForAcks()) {
676 zombie_streams_[stream->id()] = std::move(it->second);
677 } else {
678 closed_streams_.push_back(std::move(it->second));
679 // Do not retransmit data of a closed stream.
680 streams_with_pending_retransmission_.erase(stream_id);
681 if (!closed_streams_clean_up_alarm_->IsSet()) {
682 closed_streams_clean_up_alarm_->Set(
683 connection_->clock()->ApproximateNow());
684 }
685 }
686
687 // If we haven't received a FIN or RST for this stream, we need to keep track
688 // of the how many bytes the stream's flow controller believes it has
689 // received, for accurate connection level flow control accounting.
690 const bool had_fin_or_rst = stream->HasFinalReceivedByteOffset();
691 if (!had_fin_or_rst) {
692 InsertLocallyClosedStreamsHighestOffset(
693 stream_id, stream->flow_controller()->highest_received_byte_offset());
694 }
695 dynamic_stream_map_.erase(it);
696 if (IsIncomingStream(stream_id)) {
697 --num_dynamic_incoming_streams_;
698 }
699
700 const bool stream_was_draining =
701 draining_streams_.find(stream_id) != draining_streams_.end();
702 if (stream_was_draining) {
703 if (IsIncomingStream(stream_id)) {
704 --num_draining_incoming_streams_;
705 }
706 draining_streams_.erase(stream_id);
707 } else if (connection_->transport_version() == QUIC_VERSION_99) {
708 // Stream was not draining, but we did have a fin or rst, so we can now
709 // free the stream ID if version 99.
710 if (had_fin_or_rst) {
711 v99_streamid_manager_.OnStreamClosed(stream_id);
712 }
713 }
714
715 stream->OnClose();
716
717 if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
718 connection_->transport_version() != QUIC_VERSION_99) {
719 // Streams that first became draining already called OnCanCreate...
720 // This covers the case where the stream went directly to being closed.
721 OnCanCreateNewOutgoingStream();
722 }
723}
724
725void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
726 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
727
728 if (pending_stream_map_.find(stream_id) == pending_stream_map_.end()) {
729 QUIC_BUG << ENDPOINT << "Stream is already closed: " << stream_id;
730 return;
731 }
732
733 SendRstStream(stream_id, QUIC_RST_ACKNOWLEDGEMENT, 0);
734
735 // The pending stream may have been deleted and removed during SendRstStream.
736 // Remove the stream from pending stream map iff it is still in the map.
737 if (pending_stream_map_.find(stream_id) != pending_stream_map_.end()) {
738 pending_stream_map_.erase(stream_id);
739 }
740
741 --num_dynamic_incoming_streams_;
742
743 if (connection_->transport_version() == QUIC_VERSION_99) {
744 v99_streamid_manager_.OnStreamClosed(stream_id);
745 }
746
747 OnCanCreateNewOutgoingStream();
748}
749
750void QuicSession::OnFinalByteOffsetReceived(
751 QuicStreamId stream_id,
752 QuicStreamOffset final_byte_offset) {
753 auto it = locally_closed_streams_highest_offset_.find(stream_id);
754 if (it == locally_closed_streams_highest_offset_.end()) {
755 return;
756 }
757
758 QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
759 << final_byte_offset << " for stream " << stream_id;
760 QuicByteCount offset_diff = final_byte_offset - it->second;
761 if (flow_controller_.UpdateHighestReceivedOffset(
762 flow_controller_.highest_received_byte_offset() + offset_diff)) {
763 // If the final offset violates flow control, close the connection now.
764 if (flow_controller_.FlowControlViolation()) {
765 connection_->CloseConnection(
766 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
767 "Connection level flow control violation",
768 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
769 return;
770 }
771 }
772
773 flow_controller_.AddBytesConsumed(offset_diff);
774 locally_closed_streams_highest_offset_.erase(it);
775 if (IsIncomingStream(stream_id)) {
776 --num_locally_closed_incoming_streams_highest_offset_;
777 if (connection_->transport_version() == QUIC_VERSION_99) {
778 v99_streamid_manager_.OnStreamClosed(stream_id);
779 }
780 } else if (connection_->transport_version() != QUIC_VERSION_99) {
781 OnCanCreateNewOutgoingStream();
782 }
783}
784
785bool QuicSession::IsEncryptionEstablished() const {
786 // Once the handshake is confirmed, it never becomes un-confirmed.
787 if (is_handshake_confirmed_) {
788 return true;
789 }
790 return GetCryptoStream()->encryption_established();
791}
792
793bool QuicSession::IsCryptoHandshakeConfirmed() const {
794 return GetCryptoStream()->handshake_confirmed();
795}
796
797void QuicSession::OnConfigNegotiated() {
798 connection_->SetFromConfig(config_);
799
800 uint32_t max_streams = 0;
801 if (config_.HasReceivedMaxIncomingDynamicStreams()) {
802 max_streams = config_.ReceivedMaxIncomingDynamicStreams();
803 }
804 QUIC_DVLOG(1) << "Setting max_open_outgoing_streams_ to " << max_streams;
805 if (connection_->transport_version() == QUIC_VERSION_99) {
806 v99_streamid_manager_.SetMaxOpenOutgoingStreams(max_streams);
807 } else {
808 stream_id_manager_.set_max_open_outgoing_streams(max_streams);
809 }
810 if (perspective() == Perspective::IS_SERVER) {
811 if (config_.HasReceivedConnectionOptions()) {
812 // The following variations change the initial receive flow control
813 // window sizes.
814 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
815 AdjustInitialFlowControlWindows(64 * 1024);
816 }
817 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
818 AdjustInitialFlowControlWindows(128 * 1024);
819 }
820 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
821 AdjustInitialFlowControlWindows(256 * 1024);
822 }
823 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
824 AdjustInitialFlowControlWindows(512 * 1024);
825 }
826 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
827 AdjustInitialFlowControlWindows(1024 * 1024);
828 }
829 }
830
831 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
832 }
833
834 // A small number of additional incoming streams beyond the limit should be
835 // allowed. This helps avoid early connection termination when FIN/RSTs for
836 // old streams are lost or arrive out of order.
837 // Use a minimum number of additional streams, or a percentage increase,
838 // whichever is larger.
839 uint32_t max_incoming_streams_to_send =
840 config_.GetMaxIncomingDynamicStreamsToSend();
841 if (connection_->transport_version() == QUIC_VERSION_99) {
842 v99_streamid_manager_.SetMaxOpenIncomingStreams(
843 max_incoming_streams_to_send);
844 } else {
845 uint32_t max_incoming_streams =
846 std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
847 static_cast<uint32_t>(max_incoming_streams_to_send *
848 kMaxStreamsMultiplier));
849 stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
850 }
851
852 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
853 // Streams which were created before the SHLO was received (0-RTT
854 // requests) are now informed of the peer's initial flow control window.
855 OnNewStreamFlowControlWindow(
856 config_.ReceivedInitialStreamFlowControlWindowBytes());
857 }
858 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
859 OnNewSessionFlowControlWindow(
860 config_.ReceivedInitialSessionFlowControlWindowBytes());
861 }
862}
863
864void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
865 const float session_window_multiplier =
866 config_.GetInitialStreamFlowControlWindowToSend()
867 ? static_cast<float>(
868 config_.GetInitialSessionFlowControlWindowToSend()) /
869 config_.GetInitialStreamFlowControlWindowToSend()
870 : 1.5;
871
872 QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
873 config_.SetInitialStreamFlowControlWindowToSend(stream_window);
874
875 size_t session_window = session_window_multiplier * stream_window;
876 QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
877 << session_window;
878 config_.SetInitialSessionFlowControlWindowToSend(session_window);
879 flow_controller_.UpdateReceiveWindowSize(session_window);
880 // Inform all existing streams about the new window.
881 for (auto const& kv : static_stream_map_) {
882 kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
883 }
884 for (auto const& kv : dynamic_stream_map_) {
885 kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
886 }
887}
888
889void QuicSession::HandleFrameOnNonexistentOutgoingStream(
890 QuicStreamId stream_id) {
891 DCHECK(!IsClosedStream(stream_id));
892 // Received a frame for a locally-created stream that is not currently
893 // active. This is an error.
894 connection()->CloseConnection(
895 QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
896 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
897}
898
899void QuicSession::HandleRstOnValidNonexistentStream(
900 const QuicRstStreamFrame& frame) {
901 // If the stream is neither originally in active streams nor created in
902 // GetOrCreateDynamicStream(), it could be a closed stream in which case its
903 // final received byte offset need to be updated.
904 if (IsClosedStream(frame.stream_id)) {
905 // The RST frame contains the final byte offset for the stream: we can now
906 // update the connection level flow controller if needed.
907 OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
908 }
909}
910
911void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
912 if (new_window < kMinimumFlowControlSendWindow) {
913 QUIC_LOG_FIRST_N(ERROR, 1)
914 << "Peer sent us an invalid stream flow control send window: "
915 << new_window << ", below default: " << kMinimumFlowControlSendWindow;
916 if (connection_->connected()) {
917 connection_->CloseConnection(
918 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
919 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
920 }
921 return;
922 }
923
924 // Inform all existing streams about the new window.
925 for (auto const& kv : static_stream_map_) {
926 kv.second->UpdateSendWindowOffset(new_window);
927 }
928 for (auto const& kv : dynamic_stream_map_) {
929 kv.second->UpdateSendWindowOffset(new_window);
930 }
931}
932
933void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
934 if (new_window < kMinimumFlowControlSendWindow) {
935 QUIC_LOG_FIRST_N(ERROR, 1)
936 << "Peer sent us an invalid session flow control send window: "
937 << new_window << ", below default: " << kMinimumFlowControlSendWindow;
938 if (connection_->connected()) {
939 connection_->CloseConnection(
940 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
941 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
942 }
943 return;
944 }
945
946 flow_controller_.UpdateSendWindowOffset(new_window);
947}
948
949void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
950 switch (event) {
951 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
952 // to QuicSession since it is the glue.
953 case ENCRYPTION_FIRST_ESTABLISHED:
954 // Given any streams blocked by encryption a chance to write.
955 OnCanWrite();
956 break;
957
958 case ENCRYPTION_REESTABLISHED:
959 // Retransmit originally packets that were sent, since they can't be
960 // decrypted by the peer.
961 connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
962 // Given any streams blocked by encryption a chance to write.
963 OnCanWrite();
964 break;
965
966 case HANDSHAKE_CONFIRMED:
967 QUIC_BUG_IF(!config_.negotiated())
968 << ENDPOINT << "Handshake confirmed without parameter negotiation.";
969 // Discard originally encrypted packets, since they can't be decrypted by
970 // the peer.
971 NeuterUnencryptedData();
972 is_handshake_confirmed_ = true;
973 break;
974
975 default:
976 QUIC_LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
977 }
978}
979
980void QuicSession::OnCryptoHandshakeMessageSent(
981 const CryptoHandshakeMessage& /*message*/) {}
982
983void QuicSession::OnCryptoHandshakeMessageReceived(
984 const CryptoHandshakeMessage& /*message*/) {}
985
986void QuicSession::RegisterStreamPriority(QuicStreamId id,
987 bool is_static,
988 SpdyPriority priority) {
989 write_blocked_streams()->RegisterStream(id, is_static, priority);
990}
991
992void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
993 write_blocked_streams()->UnregisterStream(id, is_static);
994}
995
996void QuicSession::UpdateStreamPriority(QuicStreamId id,
997 SpdyPriority new_priority) {
998 write_blocked_streams()->UpdateStreamPriority(id, new_priority);
999}
1000
1001QuicConfig* QuicSession::config() {
1002 return &config_;
1003}
1004
1005void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1006 QuicStreamId stream_id = stream->id();
1007 QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
1008 << ". activating " << stream_id;
1009 DCHECK(!QuicContainsKey(dynamic_stream_map_, stream_id));
1010 DCHECK(!QuicContainsKey(static_stream_map_, stream_id));
1011 dynamic_stream_map_[stream_id] = std::move(stream);
1012 if (IsIncomingStream(stream_id)) {
1013 ++num_dynamic_incoming_streams_;
1014 }
1015}
1016
1017QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1018 if (connection_->transport_version() == QUIC_VERSION_99) {
1019 return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1020 }
1021 return stream_id_manager_.GetNextOutgoingStreamId();
1022}
1023
1024QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1025 if (connection_->transport_version() == QUIC_VERSION_99) {
1026 return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1027 }
1028 return stream_id_manager_.GetNextOutgoingStreamId();
1029}
1030
1031bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1032 if (connection_->transport_version() == QUIC_VERSION_99) {
1033 return v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream();
1034 }
1035 return stream_id_manager_.CanOpenNextOutgoingStream(
1036 GetNumOpenOutgoingStreams());
1037}
1038
1039bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1040 if (connection_->transport_version() == QUIC_VERSION_99) {
1041 return v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream();
1042 }
1043 return stream_id_manager_.CanOpenNextOutgoingStream(
1044 GetNumOpenOutgoingStreams());
1045}
1046
1047QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1048 StreamHandler handler =
1049 GetOrCreateStreamImpl(stream_id, /*may_buffer=*/false);
1050 DCHECK(!handler.is_pending);
1051 return handler.stream;
1052}
1053
1054QuicSession::StreamHandler QuicSession::GetOrCreateStreamImpl(
1055 QuicStreamId stream_id,
1056 bool may_buffer) {
1057 StaticStreamMap::iterator it = static_stream_map_.find(stream_id);
1058 if (it != static_stream_map_.end()) {
1059 return StreamHandler(it->second);
1060 }
1061 return GetOrCreateDynamicStreamImpl(stream_id, may_buffer);
1062}
1063
1064void QuicSession::StreamDraining(QuicStreamId stream_id) {
1065 DCHECK(QuicContainsKey(dynamic_stream_map_, stream_id));
1066 if (!QuicContainsKey(draining_streams_, stream_id)) {
1067 draining_streams_.insert(stream_id);
1068 if (IsIncomingStream(stream_id)) {
1069 ++num_draining_incoming_streams_;
1070 }
1071 if (connection_->transport_version() == QUIC_VERSION_99) {
1072 v99_streamid_manager_.OnStreamClosed(stream_id);
1073 }
1074 }
1075 if (!IsIncomingStream(stream_id)) {
1076 // Inform application that a stream is available.
1077 OnCanCreateNewOutgoingStream();
1078 }
1079}
1080
1081bool QuicSession::MaybeIncreaseLargestPeerStreamId(
1082 const QuicStreamId stream_id) {
1083 if (connection_->transport_version() == QUIC_VERSION_99) {
1084 return v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
1085 }
1086 return stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
1087}
1088
1089bool QuicSession::ShouldYield(QuicStreamId stream_id) {
1090 if (stream_id == currently_writing_stream_id_) {
1091 return false;
1092 }
1093 return write_blocked_streams()->ShouldYield(stream_id);
1094}
1095
1096QuicStream* QuicSession::GetOrCreateDynamicStream(
1097 const QuicStreamId stream_id) {
1098 StreamHandler handler =
1099 GetOrCreateDynamicStreamImpl(stream_id, /*may_buffer=*/false);
1100 DCHECK(!handler.is_pending);
1101 return handler.stream;
1102}
1103
1104QuicSession::StreamHandler QuicSession::GetOrCreateDynamicStreamImpl(
1105 QuicStreamId stream_id,
1106 bool may_buffer) {
1107 DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
1108 << "Attempt to call GetOrCreateDynamicStream for a static stream";
1109
1110 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
1111 if (it != dynamic_stream_map_.end()) {
1112 return StreamHandler(it->second.get());
1113 }
1114
1115 if (IsClosedStream(stream_id)) {
1116 return StreamHandler();
1117 }
1118
1119 if (!IsIncomingStream(stream_id)) {
1120 HandleFrameOnNonexistentOutgoingStream(stream_id);
1121 return StreamHandler();
1122 }
1123
1124 auto pending_it = pending_stream_map_.find(stream_id);
1125 if (pending_it != pending_stream_map_.end()) {
1126 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
1127 if (may_buffer) {
1128 return StreamHandler(pending_it->second.get());
1129 }
1130 // The stream limit accounting has already been taken care of
1131 // when the PendingStream was created, so there is no need to
1132 // do so here. Now we can create the actual stream from the
1133 // PendingStream.
1134 StreamHandler handler(CreateIncomingStream(std::move(*pending_it->second)));
1135 pending_stream_map_.erase(pending_it);
1136 return handler;
1137 }
1138
1139 // TODO(fkastenholz): If we are creating a new stream and we have
1140 // sent a goaway, we should ignore the stream creation. Need to
1141 // add code to A) test if goaway was sent ("if (goaway_sent_)") and
1142 // B) reject stream creation ("return nullptr")
1143
1144 if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1145 return StreamHandler();
1146 }
1147
1148 if (connection_->transport_version() != QUIC_VERSION_99) {
1149 // TODO(fayang): Let LegacyQuicStreamIdManager count open streams and make
1150 // CanOpenIncomingStream interface cosistent with that of v99.
1151 if (!stream_id_manager_.CanOpenIncomingStream(
1152 GetNumOpenIncomingStreams())) {
1153 // Refuse to open the stream.
1154 SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
1155 return StreamHandler();
1156 }
1157 }
1158
1159 if (connection_->transport_version() == QUIC_VERSION_99 && may_buffer &&
1160 ShouldBufferIncomingStream(stream_id)) {
1161 ++num_dynamic_incoming_streams_;
1162 // Since STREAM frames may arrive out of order, delay creating the
1163 // stream object until the first byte arrives. Buffer the frames and
1164 // handle flow control accounting in the PendingStream.
1165 auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
1166 StreamHandler handler(pending.get());
1167 pending_stream_map_[stream_id] = std::move(pending);
1168 return handler;
1169 }
1170
1171 return StreamHandler(CreateIncomingStream(stream_id));
1172}
1173
1174void QuicSession::set_largest_peer_created_stream_id(
1175 QuicStreamId largest_peer_created_stream_id) {
1176 if (connection_->transport_version() == QUIC_VERSION_99) {
1177 v99_streamid_manager_.SetLargestPeerCreatedStreamId(
1178 largest_peer_created_stream_id);
1179 return;
1180 }
1181 stream_id_manager_.set_largest_peer_created_stream_id(
1182 largest_peer_created_stream_id);
1183}
1184
1185bool QuicSession::IsClosedStream(QuicStreamId id) {
1186 DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
1187 id);
1188 if (IsOpenStream(id)) {
1189 // Stream is active
1190 return false;
1191 }
1192
1193 if (connection_->transport_version() == QUIC_VERSION_99) {
1194 return !v99_streamid_manager_.IsAvailableStream(id);
1195 }
1196
1197 return !stream_id_manager_.IsAvailableStream(id);
1198}
1199
1200bool QuicSession::IsOpenStream(QuicStreamId id) {
1201 DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
1202 id);
1203 if (QuicContainsKey(static_stream_map_, id) ||
1204 QuicContainsKey(dynamic_stream_map_, id) ||
1205 QuicContainsKey(pending_stream_map_, id)) {
1206 // Stream is active
1207 return true;
1208 }
1209 return false;
1210}
1211
1212size_t QuicSession::GetNumOpenIncomingStreams() const {
1213 return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
1214 num_locally_closed_incoming_streams_highest_offset_;
1215}
1216
1217size_t QuicSession::GetNumOpenOutgoingStreams() const {
1218 DCHECK_GE(GetNumDynamicOutgoingStreams() +
1219 GetNumLocallyClosedOutgoingStreamsHighestOffset(),
1220 GetNumDrainingOutgoingStreams());
1221 return GetNumDynamicOutgoingStreams() +
1222 GetNumLocallyClosedOutgoingStreamsHighestOffset() -
1223 GetNumDrainingOutgoingStreams();
1224}
1225
1226size_t QuicSession::GetNumActiveStreams() const {
1227 return dynamic_stream_map_.size() - draining_streams_.size();
1228}
1229
1230size_t QuicSession::GetNumDrainingStreams() const {
1231 return draining_streams_.size();
1232}
1233
1234void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
1235 if (GetOrCreateStream(id) == nullptr) {
1236 QUIC_BUG << "Marking unknown stream " << id << " blocked.";
1237 QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
1238 }
1239
1240 write_blocked_streams_.AddStream(id);
1241}
1242
1243bool QuicSession::HasDataToWrite() const {
1244 return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
1245 write_blocked_streams_.HasWriteBlockedDataStreams() ||
1246 connection_->HasQueuedData() ||
1247 !streams_with_pending_retransmission_.empty() ||
1248 control_frame_manager_.WillingToWrite();
1249}
1250
1251void QuicSession::OnAckNeedsRetransmittableFrame() {
1252 flow_controller_.SendWindowUpdate();
1253}
1254
1255void QuicSession::SendPing() {
1256 control_frame_manager_.WritePing();
1257}
1258
1259size_t QuicSession::GetNumDynamicOutgoingStreams() const {
QUICHE team1243d142019-03-21 13:02:02 -07001260 DCHECK_GE(static_cast<size_t>(dynamic_stream_map_.size() +
1261 pending_stream_map_.size()),
QUICHE teama6ef0a62019-03-07 20:34:33 -05001262 num_dynamic_incoming_streams_);
1263 return dynamic_stream_map_.size() + pending_stream_map_.size() -
1264 num_dynamic_incoming_streams_;
1265}
1266
1267size_t QuicSession::GetNumDrainingOutgoingStreams() const {
1268 DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
1269 return draining_streams_.size() - num_draining_incoming_streams_;
1270}
1271
1272size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
1273 DCHECK_GE(locally_closed_streams_highest_offset_.size(),
1274 num_locally_closed_incoming_streams_highest_offset_);
1275 return locally_closed_streams_highest_offset_.size() -
1276 num_locally_closed_incoming_streams_highest_offset_;
1277}
1278
1279bool QuicSession::IsConnectionFlowControlBlocked() const {
1280 return flow_controller_.IsBlocked();
1281}
1282
1283bool QuicSession::IsStreamFlowControlBlocked() {
1284 for (auto const& kv : static_stream_map_) {
1285 if (kv.second->flow_controller()->IsBlocked()) {
1286 return true;
1287 }
1288 }
1289 for (auto const& kv : dynamic_stream_map_) {
1290 if (kv.second->flow_controller()->IsBlocked()) {
1291 return true;
1292 }
1293 }
1294 return false;
1295}
1296
1297size_t QuicSession::MaxAvailableBidirectionalStreams() const {
1298 if (connection()->transport_version() == QUIC_VERSION_99) {
1299 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1300 }
1301 return stream_id_manager_.MaxAvailableStreams();
1302}
1303
1304size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
1305 if (connection()->transport_version() == QUIC_VERSION_99) {
1306 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1307 }
1308 return stream_id_manager_.MaxAvailableStreams();
1309}
1310
1311bool QuicSession::IsIncomingStream(QuicStreamId id) const {
1312 if (connection()->transport_version() == QUIC_VERSION_99) {
1313 return v99_streamid_manager_.IsIncomingStream(id);
1314 }
1315 return stream_id_manager_.IsIncomingStream(id);
1316}
1317
1318void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
1319 auto it = zombie_streams_.find(id);
1320 if (it == zombie_streams_.end()) {
1321 return;
1322 }
1323
1324 closed_streams_.push_back(std::move(it->second));
1325 if (!closed_streams_clean_up_alarm_->IsSet()) {
1326 closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
1327 }
1328 zombie_streams_.erase(it);
1329 // Do not retransmit data of a closed stream.
1330 streams_with_pending_retransmission_.erase(id);
1331}
1332
1333QuicStream* QuicSession::GetStream(QuicStreamId id) const {
1334 if (id <= largest_static_stream_id_) {
1335 auto static_stream = static_stream_map_.find(id);
1336 if (static_stream != static_stream_map_.end()) {
1337 return static_stream->second;
1338 }
1339 }
1340
1341 auto active_stream = dynamic_stream_map_.find(id);
1342 if (active_stream != dynamic_stream_map_.end()) {
1343 return active_stream->second.get();
1344 }
1345 auto zombie_stream = zombie_streams_.find(id);
1346 if (zombie_stream != zombie_streams_.end()) {
1347 return zombie_stream->second.get();
1348 }
1349 return nullptr;
1350}
1351
1352bool QuicSession::OnFrameAcked(const QuicFrame& frame,
1353 QuicTime::Delta ack_delay_time) {
1354 if (frame.type == MESSAGE_FRAME) {
1355 OnMessageAcked(frame.message_frame->message_id);
1356 return true;
1357 }
1358 if (frame.type == CRYPTO_FRAME) {
1359 return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
1360 ack_delay_time);
1361 }
1362 if (frame.type != STREAM_FRAME) {
1363 return control_frame_manager_.OnControlFrameAcked(frame);
1364 }
1365 bool new_stream_data_acked = false;
1366 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1367 // Stream can already be reset when sent frame gets acked.
1368 if (stream != nullptr) {
1369 QuicByteCount newly_acked_length = 0;
1370 new_stream_data_acked = stream->OnStreamFrameAcked(
1371 frame.stream_frame.offset, frame.stream_frame.data_length,
1372 frame.stream_frame.fin, ack_delay_time, &newly_acked_length);
1373 if (!stream->HasPendingRetransmission()) {
1374 streams_with_pending_retransmission_.erase(stream->id());
1375 }
1376 }
1377 return new_stream_data_acked;
1378}
1379
1380void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
1381 QuicStream* stream = GetStream(frame.stream_id);
1382 if (stream == nullptr) {
1383 QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
1384 << " is retransmitted.";
1385 connection()->CloseConnection(
1386 QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
1387 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1388 return;
1389 }
1390 stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
1391 frame.fin);
1392}
1393
1394void QuicSession::OnFrameLost(const QuicFrame& frame) {
1395 if (frame.type == MESSAGE_FRAME) {
1396 OnMessageLost(frame.message_frame->message_id);
1397 return;
1398 }
1399 if (frame.type == CRYPTO_FRAME) {
1400 GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
1401 return;
1402 }
1403 if (frame.type != STREAM_FRAME) {
1404 control_frame_manager_.OnControlFrameLost(frame);
1405 return;
1406 }
1407 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1408 if (stream == nullptr) {
1409 return;
1410 }
1411 stream->OnStreamFrameLost(frame.stream_frame.offset,
1412 frame.stream_frame.data_length,
1413 frame.stream_frame.fin);
1414 if (stream->HasPendingRetransmission() &&
1415 !QuicContainsKey(streams_with_pending_retransmission_,
1416 frame.stream_frame.stream_id)) {
1417 streams_with_pending_retransmission_.insert(
1418 std::make_pair(frame.stream_frame.stream_id, true));
1419 }
1420}
1421
1422void QuicSession::RetransmitFrames(const QuicFrames& frames,
1423 TransmissionType type) {
1424 QuicConnection::ScopedPacketFlusher retransmission_flusher(
1425 connection_, QuicConnection::NO_ACK);
1426 SetTransmissionType(type);
1427 for (const QuicFrame& frame : frames) {
1428 if (frame.type == MESSAGE_FRAME) {
1429 // Do not retransmit MESSAGE frames.
1430 continue;
1431 }
1432 if (frame.type == CRYPTO_FRAME) {
1433 GetMutableCryptoStream()->RetransmitData(frame.crypto_frame);
1434 continue;
1435 }
1436 if (frame.type != STREAM_FRAME) {
1437 if (!control_frame_manager_.RetransmitControlFrame(frame)) {
1438 break;
1439 }
1440 continue;
1441 }
1442 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1443 if (stream != nullptr &&
1444 !stream->RetransmitStreamData(frame.stream_frame.offset,
1445 frame.stream_frame.data_length,
1446 frame.stream_frame.fin)) {
1447 break;
1448 }
1449 }
1450}
1451
1452bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
1453 if (frame.type == MESSAGE_FRAME) {
1454 return false;
1455 }
1456 if (frame.type == CRYPTO_FRAME) {
1457 return GetCryptoStream()->IsFrameOutstanding(
1458 frame.crypto_frame->level, frame.crypto_frame->offset,
1459 frame.crypto_frame->data_length);
1460 }
1461 if (frame.type != STREAM_FRAME) {
1462 return control_frame_manager_.IsControlFrameOutstanding(frame);
1463 }
1464 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1465 return stream != nullptr &&
1466 stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
1467 frame.stream_frame.data_length,
1468 frame.stream_frame.fin);
1469}
1470
1471bool QuicSession::HasUnackedCryptoData() const {
1472 const QuicCryptoStream* crypto_stream = GetCryptoStream();
1473 if (crypto_stream->IsWaitingForAcks()) {
1474 return true;
1475 }
1476 if (GetQuicReloadableFlag(quic_fix_has_pending_crypto_data) &&
1477 crypto_stream->HasBufferedData()) {
1478 QUIC_RELOADABLE_FLAG_COUNT(quic_fix_has_pending_crypto_data);
1479 return true;
1480 }
1481 return false;
1482}
1483
1484WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
1485 QuicStreamOffset offset,
1486 QuicByteCount data_length,
1487 QuicDataWriter* writer) {
1488 QuicStream* stream = GetStream(id);
1489 if (stream == nullptr) {
1490 // This causes the connection to be closed because of failed to serialize
1491 // packet.
ianswetteb101f82019-04-04 09:13:24 -07001492 QUIC_BUG << "Stream " << id << " does not exist when trying to write data."
1493 << " version:" << connection_->transport_version();
QUICHE teama6ef0a62019-03-07 20:34:33 -05001494 return STREAM_MISSING;
1495 }
1496 if (stream->WriteStreamData(offset, data_length, writer)) {
1497 return WRITE_SUCCESS;
1498 }
1499 return WRITE_FAILED;
1500}
1501
1502bool QuicSession::WriteCryptoData(EncryptionLevel level,
1503 QuicStreamOffset offset,
1504 QuicByteCount data_length,
1505 QuicDataWriter* writer) {
1506 return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
1507 writer);
1508}
1509
1510QuicUint128 QuicSession::GetStatelessResetToken() const {
1511 return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
1512}
1513
1514bool QuicSession::RetransmitLostData() {
1515 QuicConnection::ScopedPacketFlusher retransmission_flusher(
1516 connection_, QuicConnection::SEND_ACK_IF_QUEUED);
1517 // Retransmit crypto data first.
QUICHE teamea740082019-03-11 17:58:43 -07001518 bool uses_crypto_frames =
1519 QuicVersionUsesCryptoFrames(connection_->transport_version());
QUICHE teama6ef0a62019-03-07 20:34:33 -05001520 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
1521 if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
1522 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
1523 crypto_stream->WritePendingCryptoRetransmission();
1524 }
1525 // Retransmit crypto data in stream 1 frames (version < 47).
1526 if (!uses_crypto_frames &&
1527 QuicContainsKey(
1528 streams_with_pending_retransmission_,
1529 QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
1530 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
1531 // Retransmit crypto data first.
1532 QuicStream* crypto_stream = GetStream(
1533 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1534 crypto_stream->OnCanWrite();
1535 DCHECK(CheckStreamWriteBlocked(crypto_stream));
1536 if (crypto_stream->HasPendingRetransmission()) {
1537 // Connection is write blocked.
1538 return false;
1539 } else {
1540 streams_with_pending_retransmission_.erase(
1541 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1542 }
1543 }
1544 if (control_frame_manager_.HasPendingRetransmission()) {
1545 SetTransmissionType(LOSS_RETRANSMISSION);
1546 control_frame_manager_.OnCanWrite();
1547 if (control_frame_manager_.HasPendingRetransmission()) {
1548 return false;
1549 }
1550 }
1551 while (!streams_with_pending_retransmission_.empty()) {
1552 if (!connection_->CanWriteStreamData()) {
1553 break;
1554 }
1555 // Retransmit lost data on headers and data streams.
1556 const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
1557 QuicStream* stream = GetStream(id);
1558 if (stream != nullptr) {
1559 SetTransmissionType(LOSS_RETRANSMISSION);
1560 stream->OnCanWrite();
1561 DCHECK(CheckStreamWriteBlocked(stream));
1562 if (stream->HasPendingRetransmission()) {
1563 // Connection is write blocked.
1564 break;
1565 } else if (!streams_with_pending_retransmission_.empty() &&
1566 streams_with_pending_retransmission_.begin()->first == id) {
1567 // Retransmit lost data may cause connection close. If this stream
1568 // has not yet sent fin, a RST_STREAM will be sent and it will be
1569 // removed from streams_with_pending_retransmission_.
1570 streams_with_pending_retransmission_.pop_front();
1571 }
1572 } else {
1573 QUIC_BUG << "Try to retransmit data of a closed stream";
1574 streams_with_pending_retransmission_.pop_front();
1575 }
1576 }
1577
1578 return streams_with_pending_retransmission_.empty();
1579}
1580
1581void QuicSession::NeuterUnencryptedData() {
1582 if (connection_->session_decides_what_to_write()) {
1583 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
1584 crypto_stream->NeuterUnencryptedStreamData();
1585 if (!crypto_stream->HasPendingRetransmission()) {
1586 streams_with_pending_retransmission_.erase(
1587 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1588 }
1589 }
1590 connection_->NeuterUnencryptedPackets();
1591}
1592
1593void QuicSession::SetTransmissionType(TransmissionType type) {
1594 connection_->SetTransmissionType(type);
1595}
1596
1597MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
1598 if (!IsEncryptionEstablished()) {
1599 return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
1600 }
1601 MessageStatus result =
1602 connection_->SendMessage(last_message_id_ + 1, message);
1603 if (result == MESSAGE_STATUS_SUCCESS) {
1604 return {result, ++last_message_id_};
1605 }
1606 return {result, 0};
1607}
1608
1609void QuicSession::OnMessageAcked(QuicMessageId message_id) {
1610 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
1611}
1612
1613void QuicSession::OnMessageLost(QuicMessageId message_id) {
1614 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
1615 << " is considered lost";
1616}
1617
1618void QuicSession::CleanUpClosedStreams() {
1619 closed_streams_.clear();
1620}
1621
1622bool QuicSession::session_decides_what_to_write() const {
1623 return connection_->session_decides_what_to_write();
1624}
1625
ianswettb239f862019-04-05 09:15:06 -07001626QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
1627 return connection_->GetCurrentLargestMessagePayload();
1628}
1629
1630QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
1631 return connection_->GetGuaranteedLargestMessagePayload();
QUICHE teama6ef0a62019-03-07 20:34:33 -05001632}
1633
1634void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
1635 control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
1636}
1637
1638void QuicSession::OnCanCreateNewOutgoingStream() {}
1639
1640QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
1641 if (connection_->transport_version() == QUIC_VERSION_99) {
1642 return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
1643 }
1644 return stream_id_manager_.next_outgoing_stream_id();
1645}
1646
1647QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
1648 if (connection_->transport_version() == QUIC_VERSION_99) {
1649 return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
1650 }
1651 return stream_id_manager_.next_outgoing_stream_id();
1652}
1653
1654bool QuicSession::OnMaxStreamIdFrame(const QuicMaxStreamIdFrame& frame) {
1655 return v99_streamid_manager_.OnMaxStreamIdFrame(frame);
1656}
1657
1658bool QuicSession::OnStreamIdBlockedFrame(
1659 const QuicStreamIdBlockedFrame& frame) {
1660 return v99_streamid_manager_.OnStreamIdBlockedFrame(frame);
1661}
1662
1663size_t QuicSession::max_open_incoming_bidirectional_streams() const {
1664 if (connection_->transport_version() == QUIC_VERSION_99) {
1665 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1666 }
1667 return stream_id_manager_.max_open_incoming_streams();
1668}
1669
1670size_t QuicSession::max_open_incoming_unidirectional_streams() const {
1671 if (connection_->transport_version() == QUIC_VERSION_99) {
1672 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1673 }
1674 return stream_id_manager_.max_open_incoming_streams();
1675}
1676
1677#undef ENDPOINT // undef for jumbo builds
1678} // namespace quic