blob: 09e38652a5ad563c506032756a1c4096d0ea3f4c [file] [log] [blame]
QUICHE teama6ef0a62019-03-07 20:34:33 -05001// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/core/quic_session.h"
6
7#include <cstdint>
8#include <utility>
9
10#include "net/third_party/quiche/src/quic/core/quic_connection.h"
11#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
12#include "net/third_party/quiche/src/quic/core/quic_utils.h"
13#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
18#include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
19#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
20#include "net/third_party/quiche/src/quic/platform/api/quic_string.h"
21
22using spdy::SpdyPriority;
23
24namespace quic {
25
26namespace {
27
28class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
29 public:
30 explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
31 : session_(session) {}
32 ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
33 ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
34 delete;
35
36 void OnAlarm() override { session_->CleanUpClosedStreams(); }
37
38 private:
39 QuicSession* session_;
40};
41
42} // namespace
43
44#define ENDPOINT \
45 (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
46
47QuicSession::QuicSession(QuicConnection* connection,
48 Visitor* owner,
49 const QuicConfig& config,
50 const ParsedQuicVersionVector& supported_versions)
51 : connection_(connection),
52 visitor_(owner),
53 write_blocked_streams_(),
54 config_(config),
55 stream_id_manager_(this,
56 kDefaultMaxStreamsPerConnection,
57 config_.GetMaxIncomingDynamicStreamsToSend()),
58 v99_streamid_manager_(this,
59 kDefaultMaxStreamsPerConnection,
60 config_.GetMaxIncomingDynamicStreamsToSend()),
61 num_dynamic_incoming_streams_(0),
62 num_draining_incoming_streams_(0),
63 num_locally_closed_incoming_streams_highest_offset_(0),
64 error_(QUIC_NO_ERROR),
65 flow_controller_(
66 this,
67 QuicUtils::GetInvalidStreamId(connection->transport_version()),
68 /*is_connection_flow_controller*/ true,
69 kMinimumFlowControlSendWindow,
70 config_.GetInitialSessionFlowControlWindowToSend(),
71 kSessionReceiveWindowLimit,
72 perspective() == Perspective::IS_SERVER,
73 nullptr),
74 currently_writing_stream_id_(0),
75 largest_static_stream_id_(0),
76 is_handshake_confirmed_(false),
77 goaway_sent_(false),
78 goaway_received_(false),
79 control_frame_manager_(this),
80 last_message_id_(0),
81 closed_streams_clean_up_alarm_(nullptr),
82 supported_versions_(supported_versions) {
83 closed_streams_clean_up_alarm_ =
84 QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
85 new ClosedStreamsCleanUpDelegate(this)));
86}
87
88void QuicSession::Initialize() {
89 connection_->set_visitor(this);
90 connection_->SetSessionNotifier(this);
91 connection_->SetDataProducer(this);
92 connection_->SetFromConfig(config_);
93
94 DCHECK_EQ(QuicUtils::GetCryptoStreamId(connection_->transport_version()),
95 GetMutableCryptoStream()->id());
96 RegisterStaticStream(
97 QuicUtils::GetCryptoStreamId(connection_->transport_version()),
98 GetMutableCryptoStream());
99}
100
101QuicSession::~QuicSession() {
102 QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
103}
104
105void QuicSession::RegisterStaticStream(QuicStreamId id, QuicStream* stream) {
106 static_stream_map_[id] = stream;
107
108 QUIC_BUG_IF(id >
109 largest_static_stream_id_ +
110 QuicUtils::StreamIdDelta(connection_->transport_version()))
111 << ENDPOINT << "Static stream registered out of order: " << id
112 << " vs: " << largest_static_stream_id_;
113 largest_static_stream_id_ = std::max(id, largest_static_stream_id_);
114
115 if (connection_->transport_version() == QUIC_VERSION_99) {
116 v99_streamid_manager_.RegisterStaticStream(id);
117 }
118}
119
120void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
121 // TODO(rch) deal with the error case of stream id 0.
122 QuicStreamId stream_id = frame.stream_id;
123 if (stream_id ==
124 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
125 connection()->CloseConnection(
126 QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
127 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
128 return;
129 }
130
131 if (frame.fin && QuicContainsKey(static_stream_map_, stream_id)) {
132 connection()->CloseConnection(
133 QUIC_INVALID_STREAM_ID, "Attempt to close a static stream",
134 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
135 return;
136 }
137
138 StreamHandler handler = GetOrCreateStreamImpl(stream_id, frame.offset != 0);
139 if (handler.is_pending) {
140 handler.pending->OnStreamFrame(frame);
141 return;
142 }
143
144 if (!handler.stream) {
145 // The stream no longer exists, but we may still be interested in the
146 // final stream byte offset sent by the peer. A frame with a FIN can give
147 // us this offset.
148 if (frame.fin) {
149 QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
150 OnFinalByteOffsetReceived(stream_id, final_byte_offset);
151 }
152 return;
153 }
154 handler.stream->OnStreamFrame(frame);
155}
156
157void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
158 GetMutableCryptoStream()->OnCryptoFrame(frame);
159}
160
161bool QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
162 // We are not version 99. In theory, if not in version 99 then the framer
163 // could not call OnStopSending... This is just a check that is good when
164 // both a new protocol and a new implementation of that protocol are both
165 // being developed.
166 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
167
168 QuicStreamId stream_id = frame.stream_id;
169 // If Stream ID is invalid then close the connection.
170 if (stream_id ==
171 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
172 QUIC_DVLOG(1) << ENDPOINT
173 << "Received STOP_SENDING with invalid stream_id: "
174 << stream_id << " Closing connection";
175 connection()->CloseConnection(
176 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
177 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
178 return false;
179 }
180
181 // Ignore STOP_SENDING for static streams.
182 // TODO(fkastenholz): IETF Quic does not have static streams and does not
183 // make exceptions for them with respect to processing things like
184 // STOP_SENDING.
185 if (QuicContainsKey(static_stream_map_, stream_id)) {
186 QUIC_DVLOG(1) << ENDPOINT
187 << "Received STOP_SENDING for a static stream, id: "
188 << stream_id << " Closing connection";
189 connection()->CloseConnection(
190 QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream",
191 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
192 return false;
193 }
194
195 if (visitor_) {
196 visitor_->OnStopSendingReceived(frame);
197 }
198
199 // If stream is closed, ignore the frame
200 if (IsClosedStream(stream_id)) {
201 QUIC_DVLOG(1)
202 << ENDPOINT
203 << "Received STOP_SENDING for closed or non-existent stream, id: "
204 << stream_id << " Ignoring.";
205 return true; // Continue processing the packet.
206 }
207 // If stream is non-existent, close the connection
208 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
209 if (it == dynamic_stream_map_.end()) {
210 QUIC_DVLOG(1) << ENDPOINT
211 << "Received STOP_SENDING for non-existent stream, id: "
212 << stream_id << " Closing connection";
213 connection()->CloseConnection(
214 IETF_QUIC_PROTOCOL_VIOLATION,
215 "Received STOP_SENDING for a non-existent stream",
216 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
217 return false;
218 }
219
220 // Get the QuicStream for this stream. Ignore the STOP_SENDING
221 // if the QuicStream pointer is NULL
222 // QUESTION: IS THIS THE RIGHT THING TO DO? (that is, this would happen IFF
223 // there was an entry in the map, but the pointer is null. sounds more like a
224 // deep programming error rather than a simple protocol problem).
225 QuicStream* stream = it->second.get();
226 if (stream == nullptr) {
227 QUIC_DVLOG(1) << ENDPOINT
228 << "Received STOP_SENDING for NULL QuicStream, stream_id: "
229 << stream_id << ". Ignoring.";
230 return true;
231 }
232 stream->OnStopSending(frame.application_error_code);
233
234 stream->set_stream_error(
235 static_cast<QuicRstStreamErrorCode>(frame.application_error_code));
236 SendRstStreamInner(
237 stream->id(),
238 static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
239 stream->stream_bytes_written(),
240 /*close_write_side_only=*/true);
241
242 return true;
243}
244
245void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
246 QuicStreamId stream_id = frame.stream_id;
247 if (stream_id ==
248 QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
249 connection()->CloseConnection(
250 QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
251 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
252 return;
253 }
254
255 if (QuicContainsKey(static_stream_map_, stream_id)) {
256 connection()->CloseConnection(
257 QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
258 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
259 return;
260 }
261
262 if (visitor_) {
263 visitor_->OnRstStreamReceived(frame);
264 }
265
266 // may_buffer is true here to allow subclasses to buffer streams until the
267 // first byte of payload arrives which would allow sessions to delay
268 // creation of the stream until the type is known.
269 StreamHandler handler = GetOrCreateStreamImpl(stream_id, /*may_buffer=*/true);
270 if (handler.is_pending) {
271 handler.pending->OnRstStreamFrame(frame);
272 ClosePendingStream(stream_id);
273 return;
274 }
275 if (!handler.stream) {
276 HandleRstOnValidNonexistentStream(frame);
277 return; // Errors are handled by GetOrCreateStream.
278 }
279 handler.stream->OnStreamReset(frame);
280}
281
282void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
283 goaway_received_ = true;
284}
285
286void QuicSession::OnMessageReceived(QuicStringPiece message) {
287 QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
288 << ", " << message;
289}
290
291void QuicSession::OnConnectionClosed(QuicErrorCode error,
292 const QuicString& error_details,
293 ConnectionCloseSource source) {
294 DCHECK(!connection_->connected());
295 if (error_ == QUIC_NO_ERROR) {
296 error_ = error;
297 }
298
299 while (!dynamic_stream_map_.empty()) {
300 DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
301 QuicStreamId id = it->first;
302 it->second->OnConnectionClosed(error, source);
303 // The stream should call CloseStream as part of OnConnectionClosed.
304 if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
305 QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
306 CloseStream(id);
307 }
308 }
309
310 // Cleanup zombie stream map on connection close.
311 while (!zombie_streams_.empty()) {
312 ZombieStreamMap::iterator it = zombie_streams_.begin();
313 closed_streams_.push_back(std::move(it->second));
314 zombie_streams_.erase(it);
315 }
316
317 closed_streams_clean_up_alarm_->Cancel();
318
319 if (visitor_) {
320 visitor_->OnConnectionClosed(connection_->connection_id(), error,
321 error_details, source);
322 }
323}
324
325void QuicSession::OnWriteBlocked() {
326 if (GetQuicReloadableFlag(
327 quic_connection_do_not_add_to_write_blocked_list_if_disconnected) &&
328 !connection_->connected()) {
329 QUIC_RELOADABLE_FLAG_COUNT_N(
330 quic_connection_do_not_add_to_write_blocked_list_if_disconnected, 1, 2);
331 return;
332 }
333 if (visitor_) {
334 visitor_->OnWriteBlocked(connection_);
335 }
336}
337
338void QuicSession::OnSuccessfulVersionNegotiation(
339 const ParsedQuicVersion& version) {
340 GetMutableCryptoStream()->OnSuccessfulVersionNegotiation(version);
341}
342
343void QuicSession::OnConnectivityProbeReceived(
344 const QuicSocketAddress& self_address,
345 const QuicSocketAddress& peer_address) {
346 if (perspective() == Perspective::IS_SERVER) {
347 // Server only sends back a connectivity probe after received a
348 // connectivity probe from a new peer address.
349 connection_->SendConnectivityProbingResponsePacket(peer_address);
350 }
351}
352
353void QuicSession::OnPathDegrading() {}
354
355bool QuicSession::AllowSelfAddressChange() const {
356 return false;
357}
358
359void QuicSession::OnForwardProgressConfirmed() {}
360
361void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
362 // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
363 // assume that it still exists.
364 QuicStreamId stream_id = frame.stream_id;
365 if (stream_id ==
366 QuicUtils::GetInvalidStreamId(connection_->transport_version())) {
367 // This is a window update that applies to the connection, rather than an
368 // individual stream.
369 QUIC_DLOG(INFO) << ENDPOINT
370 << "Received connection level flow control window "
371 "update with byte offset: "
372 << frame.byte_offset;
373 flow_controller_.UpdateSendWindowOffset(frame.byte_offset);
374 return;
375 }
376 QuicStream* stream = GetOrCreateStream(stream_id);
377 if (stream != nullptr) {
378 stream->OnWindowUpdateFrame(frame);
379 }
380}
381
382void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
383 // TODO(rjshade): Compare our flow control receive windows for specified
384 // streams: if we have a large window then maybe something
385 // had gone wrong with the flow control accounting.
386 QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
387 << frame.stream_id;
388}
389
390bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
391 uint64_t previous_bytes_written,
392 bool previous_fin_sent) {
393 if ( // Stream should not be closed.
394 !stream->write_side_closed() &&
395 // Not connection flow control blocked.
396 !flow_controller_.IsBlocked() &&
397 // Detect lack of forward progress.
398 previous_bytes_written == stream->stream_bytes_written() &&
399 previous_fin_sent == stream->fin_sent()) {
400 stream->set_busy_counter(stream->busy_counter() + 1);
401 QUIC_DVLOG(1) << "Suspected busy loop on stream id " << stream->id()
402 << " stream_bytes_written " << stream->stream_bytes_written()
403 << " fin " << stream->fin_sent() << " count "
404 << stream->busy_counter();
405 // Wait a few iterations before firing, the exact count is
406 // arbitrary, more than a few to cover a few test-only false
407 // positives.
408 if (stream->busy_counter() > 20) {
409 QUIC_LOG(ERROR) << "Detected busy loop on stream id " << stream->id()
410 << " stream_bytes_written "
411 << stream->stream_bytes_written() << " fin "
412 << stream->fin_sent();
413 return false;
414 }
415 } else {
416 stream->set_busy_counter(0);
417 }
418 return true;
419}
420
421bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
422 if (!stream->write_side_closed() && stream->HasBufferedData() &&
423 !stream->flow_controller()->IsBlocked() &&
424 !write_blocked_streams_.IsStreamBlocked(stream->id())) {
425 QUIC_DLOG(ERROR) << "stream " << stream->id() << " has buffered "
426 << stream->BufferedDataBytes()
427 << " bytes, and is not flow control blocked, "
428 "but it is not in the write block list.";
429 return false;
430 }
431 return true;
432}
433
434void QuicSession::OnCanWrite() {
435 if (!RetransmitLostData()) {
436 // Cannot finish retransmitting lost data, connection is write blocked.
437 QUIC_DVLOG(1) << ENDPOINT
438 << "Cannot finish retransmitting lost data, connection is "
439 "write blocked.";
440 return;
441 }
442 if (session_decides_what_to_write()) {
443 SetTransmissionType(NOT_RETRANSMISSION);
444 }
445 // We limit the number of writes to the number of pending streams. If more
446 // streams become pending, WillingAndAbleToWrite will be true, which will
447 // cause the connection to request resumption before yielding to other
448 // connections.
449 // If we are connection level flow control blocked, then only allow the
450 // crypto and headers streams to try writing as all other streams will be
451 // blocked.
452 size_t num_writes = flow_controller_.IsBlocked()
453 ? write_blocked_streams_.NumBlockedSpecialStreams()
454 : write_blocked_streams_.NumBlockedStreams();
455 if (num_writes == 0 && !control_frame_manager_.WillingToWrite()) {
456 return;
457 }
458
459 QuicConnection::ScopedPacketFlusher flusher(
460 connection_, QuicConnection::SEND_ACK_IF_QUEUED);
461 if (control_frame_manager_.WillingToWrite()) {
462 control_frame_manager_.OnCanWrite();
463 }
464 for (size_t i = 0; i < num_writes; ++i) {
465 if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
466 write_blocked_streams_.HasWriteBlockedDataStreams())) {
467 // Writing one stream removed another!? Something's broken.
468 QUIC_BUG << "WriteBlockedStream is missing";
469 connection_->CloseConnection(QUIC_INTERNAL_ERROR,
470 "WriteBlockedStream is missing",
471 ConnectionCloseBehavior::SILENT_CLOSE);
472 return;
473 }
474 if (!connection_->CanWriteStreamData()) {
475 return;
476 }
477 currently_writing_stream_id_ = write_blocked_streams_.PopFront();
478 QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
479 if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
480 // If the stream can't write all bytes it'll re-add itself to the blocked
481 // list.
482 uint64_t previous_bytes_written = stream->stream_bytes_written();
483 bool previous_fin_sent = stream->fin_sent();
484 QUIC_DVLOG(1) << "stream " << stream->id() << " bytes_written "
485 << previous_bytes_written << " fin " << previous_fin_sent;
486 stream->OnCanWrite();
487 DCHECK(CheckStreamWriteBlocked(stream));
488 DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
489 previous_fin_sent));
490 }
491 currently_writing_stream_id_ = 0;
492 }
493}
494
495bool QuicSession::WillingAndAbleToWrite() const {
496 // Schedule a write when:
497 // 1) control frame manager has pending or new control frames, or
498 // 2) any stream has pending retransmissions, or
499 // 3) If the crypto or headers streams are blocked, or
500 // 4) connection is not flow control blocked and there are write blocked
501 // streams.
502 return control_frame_manager_.WillingToWrite() ||
503 !streams_with_pending_retransmission_.empty() ||
504 write_blocked_streams_.HasWriteBlockedSpecialStream() ||
505 (!flow_controller_.IsBlocked() &&
506 write_blocked_streams_.HasWriteBlockedDataStreams());
507}
508
509bool QuicSession::HasPendingHandshake() const {
510 return QuicContainsKey(
511 streams_with_pending_retransmission_,
512 QuicUtils::GetCryptoStreamId(connection_->transport_version())) ||
513 write_blocked_streams_.IsStreamBlocked(
514 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
515}
516
517uint64_t QuicSession::GetNumOpenDynamicStreams() const {
518 return dynamic_stream_map_.size() - draining_streams_.size() +
519 locally_closed_streams_highest_offset_.size();
520}
521
522void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
523 const QuicSocketAddress& peer_address,
524 const QuicReceivedPacket& packet) {
525 connection_->ProcessUdpPacket(self_address, peer_address, packet);
526}
527
528QuicConsumedData QuicSession::WritevData(QuicStream* stream,
529 QuicStreamId id,
530 size_t write_length,
531 QuicStreamOffset offset,
532 StreamSendingState state) {
533 // This check is an attempt to deal with potential memory corruption
534 // in which |id| ends up set to 1 (the crypto stream id). If this happen
535 // it might end up resulting in unencrypted stream data being sent.
536 // While this is impossible to avoid given sufficient corruption, this
537 // seems like a reasonable mitigation.
538 if (id == QuicUtils::GetCryptoStreamId(connection_->transport_version()) &&
539 stream != GetMutableCryptoStream()) {
540 QUIC_BUG << "Stream id mismatch";
541 connection_->CloseConnection(
542 QUIC_INTERNAL_ERROR,
543 "Non-crypto stream attempted to write data as crypto stream.",
544 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
545 return QuicConsumedData(0, false);
546 }
547 if (!IsEncryptionEstablished() &&
548 id != QuicUtils::GetCryptoStreamId(connection_->transport_version())) {
549 // Do not let streams write without encryption. The calling stream will end
550 // up write blocked until OnCanWrite is next called.
551 return QuicConsumedData(0, false);
552 }
553
554 QuicConsumedData data =
555 connection_->SendStreamData(id, write_length, offset, state);
556 if (offset >= stream->stream_bytes_written()) {
557 // This is new stream data.
558 write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
559 }
560 return data;
561}
562
563bool QuicSession::WriteControlFrame(const QuicFrame& frame) {
564 return connection_->SendControlFrame(frame);
565}
566
567void QuicSession::SendRstStream(QuicStreamId id,
568 QuicRstStreamErrorCode error,
569 QuicStreamOffset bytes_written) {
570 SendRstStreamInner(id, error, bytes_written, /*close_write_side_only=*/false);
571}
572
573void QuicSession::SendRstStreamInner(QuicStreamId id,
574 QuicRstStreamErrorCode error,
575 QuicStreamOffset bytes_written,
576 bool close_write_side_only) {
577 if (connection()->connected()) {
578 // Only send if still connected.
579 if (close_write_side_only) {
580 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
581 // Send a RST_STREAM frame.
582 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
583 } else {
584 // Send a RST_STREAM frame plus, if version 99, an IETF
585 // QUIC STOP_SENDING frame. Both sre sent to emulate
586 // the two-way close that Google QUIC's RST_STREAM does.
587 if (connection_->transport_version() == QUIC_VERSION_99) {
588 QuicConnection::ScopedPacketFlusher flusher(
589 connection(), QuicConnection::SEND_ACK_IF_QUEUED);
590 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
591 control_frame_manager_.WriteOrBufferStopSending(error, id);
592 } else {
593 control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
594 }
595 }
596 connection_->OnStreamReset(id, error);
597 }
598 if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
599 OnStreamDoneWaitingForAcks(id);
600 return;
601 }
602
603 if (!close_write_side_only) {
604 CloseStreamInner(id, true);
605 return;
606 }
607 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
608
609 DynamicStreamMap::iterator it = dynamic_stream_map_.find(id);
610 if (it != dynamic_stream_map_.end()) {
611 QuicStream* stream = it->second.get();
612 if (stream) {
613 stream->set_rst_sent(true);
614 stream->CloseWriteSide();
615 }
616 }
617}
618
619void QuicSession::SendGoAway(QuicErrorCode error_code,
620 const QuicString& reason) {
621 // GOAWAY frame is not supported in v99.
622 DCHECK_NE(QUIC_VERSION_99, connection_->transport_version());
623 if (goaway_sent_) {
624 return;
625 }
626 goaway_sent_ = true;
627 control_frame_manager_.WriteOrBufferGoAway(
628 error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
629}
630
631void QuicSession::SendBlocked(QuicStreamId id) {
632 control_frame_manager_.WriteOrBufferBlocked(id);
633}
634
635void QuicSession::SendWindowUpdate(QuicStreamId id,
636 QuicStreamOffset byte_offset) {
637 control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
638}
639
640void QuicSession::SendMaxStreamId(QuicStreamId max_allowed_incoming_id) {
641 control_frame_manager_.WriteOrBufferMaxStreamId(max_allowed_incoming_id);
642}
643
644void QuicSession::SendStreamIdBlocked(QuicStreamId max_allowed_outgoing_id) {
645 control_frame_manager_.WriteOrBufferStreamIdBlocked(max_allowed_outgoing_id);
646}
647
648void QuicSession::CloseStream(QuicStreamId stream_id) {
649 CloseStreamInner(stream_id, false);
650}
651
652void QuicSession::InsertLocallyClosedStreamsHighestOffset(
653 const QuicStreamId id,
654 QuicStreamOffset offset) {
655 locally_closed_streams_highest_offset_[id] = offset;
656 if (IsIncomingStream(id)) {
657 ++num_locally_closed_incoming_streams_highest_offset_;
658 }
659}
660
661void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool locally_reset) {
662 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
663
664 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
665 if (it == dynamic_stream_map_.end()) {
666 // When CloseStreamInner has been called recursively (via
667 // QuicStream::OnClose), the stream will already have been deleted
668 // from stream_map_, so return immediately.
669 QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
670 return;
671 }
672 QuicStream* stream = it->second.get();
673
674 // Tell the stream that a RST has been sent.
675 if (locally_reset) {
676 stream->set_rst_sent(true);
677 }
678
679 if (stream->IsWaitingForAcks()) {
680 zombie_streams_[stream->id()] = std::move(it->second);
681 } else {
682 closed_streams_.push_back(std::move(it->second));
683 // Do not retransmit data of a closed stream.
684 streams_with_pending_retransmission_.erase(stream_id);
685 if (!closed_streams_clean_up_alarm_->IsSet()) {
686 closed_streams_clean_up_alarm_->Set(
687 connection_->clock()->ApproximateNow());
688 }
689 }
690
691 // If we haven't received a FIN or RST for this stream, we need to keep track
692 // of the how many bytes the stream's flow controller believes it has
693 // received, for accurate connection level flow control accounting.
694 const bool had_fin_or_rst = stream->HasFinalReceivedByteOffset();
695 if (!had_fin_or_rst) {
696 InsertLocallyClosedStreamsHighestOffset(
697 stream_id, stream->flow_controller()->highest_received_byte_offset());
698 }
699 dynamic_stream_map_.erase(it);
700 if (IsIncomingStream(stream_id)) {
701 --num_dynamic_incoming_streams_;
702 }
703
704 const bool stream_was_draining =
705 draining_streams_.find(stream_id) != draining_streams_.end();
706 if (stream_was_draining) {
707 if (IsIncomingStream(stream_id)) {
708 --num_draining_incoming_streams_;
709 }
710 draining_streams_.erase(stream_id);
711 } else if (connection_->transport_version() == QUIC_VERSION_99) {
712 // Stream was not draining, but we did have a fin or rst, so we can now
713 // free the stream ID if version 99.
714 if (had_fin_or_rst) {
715 v99_streamid_manager_.OnStreamClosed(stream_id);
716 }
717 }
718
719 stream->OnClose();
720
721 if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
722 connection_->transport_version() != QUIC_VERSION_99) {
723 // Streams that first became draining already called OnCanCreate...
724 // This covers the case where the stream went directly to being closed.
725 OnCanCreateNewOutgoingStream();
726 }
727}
728
729void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
730 QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
731
732 if (pending_stream_map_.find(stream_id) == pending_stream_map_.end()) {
733 QUIC_BUG << ENDPOINT << "Stream is already closed: " << stream_id;
734 return;
735 }
736
737 SendRstStream(stream_id, QUIC_RST_ACKNOWLEDGEMENT, 0);
738
739 // The pending stream may have been deleted and removed during SendRstStream.
740 // Remove the stream from pending stream map iff it is still in the map.
741 if (pending_stream_map_.find(stream_id) != pending_stream_map_.end()) {
742 pending_stream_map_.erase(stream_id);
743 }
744
745 --num_dynamic_incoming_streams_;
746
747 if (connection_->transport_version() == QUIC_VERSION_99) {
748 v99_streamid_manager_.OnStreamClosed(stream_id);
749 }
750
751 OnCanCreateNewOutgoingStream();
752}
753
754void QuicSession::OnFinalByteOffsetReceived(
755 QuicStreamId stream_id,
756 QuicStreamOffset final_byte_offset) {
757 auto it = locally_closed_streams_highest_offset_.find(stream_id);
758 if (it == locally_closed_streams_highest_offset_.end()) {
759 return;
760 }
761
762 QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
763 << final_byte_offset << " for stream " << stream_id;
764 QuicByteCount offset_diff = final_byte_offset - it->second;
765 if (flow_controller_.UpdateHighestReceivedOffset(
766 flow_controller_.highest_received_byte_offset() + offset_diff)) {
767 // If the final offset violates flow control, close the connection now.
768 if (flow_controller_.FlowControlViolation()) {
769 connection_->CloseConnection(
770 QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
771 "Connection level flow control violation",
772 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
773 return;
774 }
775 }
776
777 flow_controller_.AddBytesConsumed(offset_diff);
778 locally_closed_streams_highest_offset_.erase(it);
779 if (IsIncomingStream(stream_id)) {
780 --num_locally_closed_incoming_streams_highest_offset_;
781 if (connection_->transport_version() == QUIC_VERSION_99) {
782 v99_streamid_manager_.OnStreamClosed(stream_id);
783 }
784 } else if (connection_->transport_version() != QUIC_VERSION_99) {
785 OnCanCreateNewOutgoingStream();
786 }
787}
788
789bool QuicSession::IsEncryptionEstablished() const {
790 // Once the handshake is confirmed, it never becomes un-confirmed.
791 if (is_handshake_confirmed_) {
792 return true;
793 }
794 return GetCryptoStream()->encryption_established();
795}
796
797bool QuicSession::IsCryptoHandshakeConfirmed() const {
798 return GetCryptoStream()->handshake_confirmed();
799}
800
801void QuicSession::OnConfigNegotiated() {
802 connection_->SetFromConfig(config_);
803
804 uint32_t max_streams = 0;
805 if (config_.HasReceivedMaxIncomingDynamicStreams()) {
806 max_streams = config_.ReceivedMaxIncomingDynamicStreams();
807 }
808 QUIC_DVLOG(1) << "Setting max_open_outgoing_streams_ to " << max_streams;
809 if (connection_->transport_version() == QUIC_VERSION_99) {
810 v99_streamid_manager_.SetMaxOpenOutgoingStreams(max_streams);
811 } else {
812 stream_id_manager_.set_max_open_outgoing_streams(max_streams);
813 }
814 if (perspective() == Perspective::IS_SERVER) {
815 if (config_.HasReceivedConnectionOptions()) {
816 // The following variations change the initial receive flow control
817 // window sizes.
818 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
819 AdjustInitialFlowControlWindows(64 * 1024);
820 }
821 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
822 AdjustInitialFlowControlWindows(128 * 1024);
823 }
824 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
825 AdjustInitialFlowControlWindows(256 * 1024);
826 }
827 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
828 AdjustInitialFlowControlWindows(512 * 1024);
829 }
830 if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
831 AdjustInitialFlowControlWindows(1024 * 1024);
832 }
833 }
834
835 config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
836 }
837
838 // A small number of additional incoming streams beyond the limit should be
839 // allowed. This helps avoid early connection termination when FIN/RSTs for
840 // old streams are lost or arrive out of order.
841 // Use a minimum number of additional streams, or a percentage increase,
842 // whichever is larger.
843 uint32_t max_incoming_streams_to_send =
844 config_.GetMaxIncomingDynamicStreamsToSend();
845 if (connection_->transport_version() == QUIC_VERSION_99) {
846 v99_streamid_manager_.SetMaxOpenIncomingStreams(
847 max_incoming_streams_to_send);
848 } else {
849 uint32_t max_incoming_streams =
850 std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
851 static_cast<uint32_t>(max_incoming_streams_to_send *
852 kMaxStreamsMultiplier));
853 stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
854 }
855
856 if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
857 // Streams which were created before the SHLO was received (0-RTT
858 // requests) are now informed of the peer's initial flow control window.
859 OnNewStreamFlowControlWindow(
860 config_.ReceivedInitialStreamFlowControlWindowBytes());
861 }
862 if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
863 OnNewSessionFlowControlWindow(
864 config_.ReceivedInitialSessionFlowControlWindowBytes());
865 }
866}
867
868void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
869 const float session_window_multiplier =
870 config_.GetInitialStreamFlowControlWindowToSend()
871 ? static_cast<float>(
872 config_.GetInitialSessionFlowControlWindowToSend()) /
873 config_.GetInitialStreamFlowControlWindowToSend()
874 : 1.5;
875
876 QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
877 config_.SetInitialStreamFlowControlWindowToSend(stream_window);
878
879 size_t session_window = session_window_multiplier * stream_window;
880 QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
881 << session_window;
882 config_.SetInitialSessionFlowControlWindowToSend(session_window);
883 flow_controller_.UpdateReceiveWindowSize(session_window);
884 // Inform all existing streams about the new window.
885 for (auto const& kv : static_stream_map_) {
886 kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
887 }
888 for (auto const& kv : dynamic_stream_map_) {
889 kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
890 }
891}
892
893void QuicSession::HandleFrameOnNonexistentOutgoingStream(
894 QuicStreamId stream_id) {
895 DCHECK(!IsClosedStream(stream_id));
896 // Received a frame for a locally-created stream that is not currently
897 // active. This is an error.
898 connection()->CloseConnection(
899 QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
900 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
901}
902
903void QuicSession::HandleRstOnValidNonexistentStream(
904 const QuicRstStreamFrame& frame) {
905 // If the stream is neither originally in active streams nor created in
906 // GetOrCreateDynamicStream(), it could be a closed stream in which case its
907 // final received byte offset need to be updated.
908 if (IsClosedStream(frame.stream_id)) {
909 // The RST frame contains the final byte offset for the stream: we can now
910 // update the connection level flow controller if needed.
911 OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
912 }
913}
914
915void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
916 if (new_window < kMinimumFlowControlSendWindow) {
917 QUIC_LOG_FIRST_N(ERROR, 1)
918 << "Peer sent us an invalid stream flow control send window: "
919 << new_window << ", below default: " << kMinimumFlowControlSendWindow;
920 if (connection_->connected()) {
921 connection_->CloseConnection(
922 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
923 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
924 }
925 return;
926 }
927
928 // Inform all existing streams about the new window.
929 for (auto const& kv : static_stream_map_) {
930 kv.second->UpdateSendWindowOffset(new_window);
931 }
932 for (auto const& kv : dynamic_stream_map_) {
933 kv.second->UpdateSendWindowOffset(new_window);
934 }
935}
936
937void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
938 if (new_window < kMinimumFlowControlSendWindow) {
939 QUIC_LOG_FIRST_N(ERROR, 1)
940 << "Peer sent us an invalid session flow control send window: "
941 << new_window << ", below default: " << kMinimumFlowControlSendWindow;
942 if (connection_->connected()) {
943 connection_->CloseConnection(
944 QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
945 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
946 }
947 return;
948 }
949
950 flow_controller_.UpdateSendWindowOffset(new_window);
951}
952
953void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
954 switch (event) {
955 // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
956 // to QuicSession since it is the glue.
957 case ENCRYPTION_FIRST_ESTABLISHED:
958 // Given any streams blocked by encryption a chance to write.
959 OnCanWrite();
960 break;
961
962 case ENCRYPTION_REESTABLISHED:
963 // Retransmit originally packets that were sent, since they can't be
964 // decrypted by the peer.
965 connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
966 // Given any streams blocked by encryption a chance to write.
967 OnCanWrite();
968 break;
969
970 case HANDSHAKE_CONFIRMED:
971 QUIC_BUG_IF(!config_.negotiated())
972 << ENDPOINT << "Handshake confirmed without parameter negotiation.";
973 // Discard originally encrypted packets, since they can't be decrypted by
974 // the peer.
975 NeuterUnencryptedData();
976 is_handshake_confirmed_ = true;
977 break;
978
979 default:
980 QUIC_LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
981 }
982}
983
984void QuicSession::OnCryptoHandshakeMessageSent(
985 const CryptoHandshakeMessage& /*message*/) {}
986
987void QuicSession::OnCryptoHandshakeMessageReceived(
988 const CryptoHandshakeMessage& /*message*/) {}
989
990void QuicSession::RegisterStreamPriority(QuicStreamId id,
991 bool is_static,
992 SpdyPriority priority) {
993 write_blocked_streams()->RegisterStream(id, is_static, priority);
994}
995
996void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
997 write_blocked_streams()->UnregisterStream(id, is_static);
998}
999
1000void QuicSession::UpdateStreamPriority(QuicStreamId id,
1001 SpdyPriority new_priority) {
1002 write_blocked_streams()->UpdateStreamPriority(id, new_priority);
1003}
1004
1005QuicConfig* QuicSession::config() {
1006 return &config_;
1007}
1008
1009void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
1010 QuicStreamId stream_id = stream->id();
1011 QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
1012 << ". activating " << stream_id;
1013 DCHECK(!QuicContainsKey(dynamic_stream_map_, stream_id));
1014 DCHECK(!QuicContainsKey(static_stream_map_, stream_id));
1015 dynamic_stream_map_[stream_id] = std::move(stream);
1016 if (IsIncomingStream(stream_id)) {
1017 ++num_dynamic_incoming_streams_;
1018 }
1019}
1020
1021QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
1022 if (connection_->transport_version() == QUIC_VERSION_99) {
1023 return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
1024 }
1025 return stream_id_manager_.GetNextOutgoingStreamId();
1026}
1027
1028QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
1029 if (connection_->transport_version() == QUIC_VERSION_99) {
1030 return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
1031 }
1032 return stream_id_manager_.GetNextOutgoingStreamId();
1033}
1034
1035bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
1036 if (connection_->transport_version() == QUIC_VERSION_99) {
1037 return v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream();
1038 }
1039 return stream_id_manager_.CanOpenNextOutgoingStream(
1040 GetNumOpenOutgoingStreams());
1041}
1042
1043bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
1044 if (connection_->transport_version() == QUIC_VERSION_99) {
1045 return v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream();
1046 }
1047 return stream_id_manager_.CanOpenNextOutgoingStream(
1048 GetNumOpenOutgoingStreams());
1049}
1050
1051QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
1052 StreamHandler handler =
1053 GetOrCreateStreamImpl(stream_id, /*may_buffer=*/false);
1054 DCHECK(!handler.is_pending);
1055 return handler.stream;
1056}
1057
1058QuicSession::StreamHandler QuicSession::GetOrCreateStreamImpl(
1059 QuicStreamId stream_id,
1060 bool may_buffer) {
1061 StaticStreamMap::iterator it = static_stream_map_.find(stream_id);
1062 if (it != static_stream_map_.end()) {
1063 return StreamHandler(it->second);
1064 }
1065 return GetOrCreateDynamicStreamImpl(stream_id, may_buffer);
1066}
1067
1068void QuicSession::StreamDraining(QuicStreamId stream_id) {
1069 DCHECK(QuicContainsKey(dynamic_stream_map_, stream_id));
1070 if (!QuicContainsKey(draining_streams_, stream_id)) {
1071 draining_streams_.insert(stream_id);
1072 if (IsIncomingStream(stream_id)) {
1073 ++num_draining_incoming_streams_;
1074 }
1075 if (connection_->transport_version() == QUIC_VERSION_99) {
1076 v99_streamid_manager_.OnStreamClosed(stream_id);
1077 }
1078 }
1079 if (!IsIncomingStream(stream_id)) {
1080 // Inform application that a stream is available.
1081 OnCanCreateNewOutgoingStream();
1082 }
1083}
1084
1085bool QuicSession::MaybeIncreaseLargestPeerStreamId(
1086 const QuicStreamId stream_id) {
1087 if (connection_->transport_version() == QUIC_VERSION_99) {
1088 return v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
1089 }
1090 return stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
1091}
1092
1093bool QuicSession::ShouldYield(QuicStreamId stream_id) {
1094 if (stream_id == currently_writing_stream_id_) {
1095 return false;
1096 }
1097 return write_blocked_streams()->ShouldYield(stream_id);
1098}
1099
1100QuicStream* QuicSession::GetOrCreateDynamicStream(
1101 const QuicStreamId stream_id) {
1102 StreamHandler handler =
1103 GetOrCreateDynamicStreamImpl(stream_id, /*may_buffer=*/false);
1104 DCHECK(!handler.is_pending);
1105 return handler.stream;
1106}
1107
1108QuicSession::StreamHandler QuicSession::GetOrCreateDynamicStreamImpl(
1109 QuicStreamId stream_id,
1110 bool may_buffer) {
1111 DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
1112 << "Attempt to call GetOrCreateDynamicStream for a static stream";
1113
1114 DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
1115 if (it != dynamic_stream_map_.end()) {
1116 return StreamHandler(it->second.get());
1117 }
1118
1119 if (IsClosedStream(stream_id)) {
1120 return StreamHandler();
1121 }
1122
1123 if (!IsIncomingStream(stream_id)) {
1124 HandleFrameOnNonexistentOutgoingStream(stream_id);
1125 return StreamHandler();
1126 }
1127
1128 auto pending_it = pending_stream_map_.find(stream_id);
1129 if (pending_it != pending_stream_map_.end()) {
1130 DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
1131 if (may_buffer) {
1132 return StreamHandler(pending_it->second.get());
1133 }
1134 // The stream limit accounting has already been taken care of
1135 // when the PendingStream was created, so there is no need to
1136 // do so here. Now we can create the actual stream from the
1137 // PendingStream.
1138 StreamHandler handler(CreateIncomingStream(std::move(*pending_it->second)));
1139 pending_stream_map_.erase(pending_it);
1140 return handler;
1141 }
1142
1143 // TODO(fkastenholz): If we are creating a new stream and we have
1144 // sent a goaway, we should ignore the stream creation. Need to
1145 // add code to A) test if goaway was sent ("if (goaway_sent_)") and
1146 // B) reject stream creation ("return nullptr")
1147
1148 if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
1149 return StreamHandler();
1150 }
1151
1152 if (connection_->transport_version() != QUIC_VERSION_99) {
1153 // TODO(fayang): Let LegacyQuicStreamIdManager count open streams and make
1154 // CanOpenIncomingStream interface cosistent with that of v99.
1155 if (!stream_id_manager_.CanOpenIncomingStream(
1156 GetNumOpenIncomingStreams())) {
1157 // Refuse to open the stream.
1158 SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
1159 return StreamHandler();
1160 }
1161 }
1162
1163 if (connection_->transport_version() == QUIC_VERSION_99 && may_buffer &&
1164 ShouldBufferIncomingStream(stream_id)) {
1165 ++num_dynamic_incoming_streams_;
1166 // Since STREAM frames may arrive out of order, delay creating the
1167 // stream object until the first byte arrives. Buffer the frames and
1168 // handle flow control accounting in the PendingStream.
1169 auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
1170 StreamHandler handler(pending.get());
1171 pending_stream_map_[stream_id] = std::move(pending);
1172 return handler;
1173 }
1174
1175 return StreamHandler(CreateIncomingStream(stream_id));
1176}
1177
1178void QuicSession::set_largest_peer_created_stream_id(
1179 QuicStreamId largest_peer_created_stream_id) {
1180 if (connection_->transport_version() == QUIC_VERSION_99) {
1181 v99_streamid_manager_.SetLargestPeerCreatedStreamId(
1182 largest_peer_created_stream_id);
1183 return;
1184 }
1185 stream_id_manager_.set_largest_peer_created_stream_id(
1186 largest_peer_created_stream_id);
1187}
1188
1189bool QuicSession::IsClosedStream(QuicStreamId id) {
1190 DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
1191 id);
1192 if (IsOpenStream(id)) {
1193 // Stream is active
1194 return false;
1195 }
1196
1197 if (connection_->transport_version() == QUIC_VERSION_99) {
1198 return !v99_streamid_manager_.IsAvailableStream(id);
1199 }
1200
1201 return !stream_id_manager_.IsAvailableStream(id);
1202}
1203
1204bool QuicSession::IsOpenStream(QuicStreamId id) {
1205 DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
1206 id);
1207 if (QuicContainsKey(static_stream_map_, id) ||
1208 QuicContainsKey(dynamic_stream_map_, id) ||
1209 QuicContainsKey(pending_stream_map_, id)) {
1210 // Stream is active
1211 return true;
1212 }
1213 return false;
1214}
1215
1216size_t QuicSession::GetNumOpenIncomingStreams() const {
1217 return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
1218 num_locally_closed_incoming_streams_highest_offset_;
1219}
1220
1221size_t QuicSession::GetNumOpenOutgoingStreams() const {
1222 DCHECK_GE(GetNumDynamicOutgoingStreams() +
1223 GetNumLocallyClosedOutgoingStreamsHighestOffset(),
1224 GetNumDrainingOutgoingStreams());
1225 return GetNumDynamicOutgoingStreams() +
1226 GetNumLocallyClosedOutgoingStreamsHighestOffset() -
1227 GetNumDrainingOutgoingStreams();
1228}
1229
1230size_t QuicSession::GetNumActiveStreams() const {
1231 return dynamic_stream_map_.size() - draining_streams_.size();
1232}
1233
1234size_t QuicSession::GetNumDrainingStreams() const {
1235 return draining_streams_.size();
1236}
1237
1238void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
1239 if (GetOrCreateStream(id) == nullptr) {
1240 QUIC_BUG << "Marking unknown stream " << id << " blocked.";
1241 QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
1242 }
1243
1244 write_blocked_streams_.AddStream(id);
1245}
1246
1247bool QuicSession::HasDataToWrite() const {
1248 return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
1249 write_blocked_streams_.HasWriteBlockedDataStreams() ||
1250 connection_->HasQueuedData() ||
1251 !streams_with_pending_retransmission_.empty() ||
1252 control_frame_manager_.WillingToWrite();
1253}
1254
1255void QuicSession::OnAckNeedsRetransmittableFrame() {
1256 flow_controller_.SendWindowUpdate();
1257}
1258
1259void QuicSession::SendPing() {
1260 control_frame_manager_.WritePing();
1261}
1262
1263size_t QuicSession::GetNumDynamicOutgoingStreams() const {
1264 DCHECK_GE(dynamic_stream_map_.size() + pending_stream_map_.size(),
1265 num_dynamic_incoming_streams_);
1266 return dynamic_stream_map_.size() + pending_stream_map_.size() -
1267 num_dynamic_incoming_streams_;
1268}
1269
1270size_t QuicSession::GetNumDrainingOutgoingStreams() const {
1271 DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
1272 return draining_streams_.size() - num_draining_incoming_streams_;
1273}
1274
1275size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
1276 DCHECK_GE(locally_closed_streams_highest_offset_.size(),
1277 num_locally_closed_incoming_streams_highest_offset_);
1278 return locally_closed_streams_highest_offset_.size() -
1279 num_locally_closed_incoming_streams_highest_offset_;
1280}
1281
1282bool QuicSession::IsConnectionFlowControlBlocked() const {
1283 return flow_controller_.IsBlocked();
1284}
1285
1286bool QuicSession::IsStreamFlowControlBlocked() {
1287 for (auto const& kv : static_stream_map_) {
1288 if (kv.second->flow_controller()->IsBlocked()) {
1289 return true;
1290 }
1291 }
1292 for (auto const& kv : dynamic_stream_map_) {
1293 if (kv.second->flow_controller()->IsBlocked()) {
1294 return true;
1295 }
1296 }
1297 return false;
1298}
1299
1300size_t QuicSession::MaxAvailableBidirectionalStreams() const {
1301 if (connection()->transport_version() == QUIC_VERSION_99) {
1302 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1303 }
1304 return stream_id_manager_.MaxAvailableStreams();
1305}
1306
1307size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
1308 if (connection()->transport_version() == QUIC_VERSION_99) {
1309 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1310 }
1311 return stream_id_manager_.MaxAvailableStreams();
1312}
1313
1314bool QuicSession::IsIncomingStream(QuicStreamId id) const {
1315 if (connection()->transport_version() == QUIC_VERSION_99) {
1316 return v99_streamid_manager_.IsIncomingStream(id);
1317 }
1318 return stream_id_manager_.IsIncomingStream(id);
1319}
1320
1321void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
1322 auto it = zombie_streams_.find(id);
1323 if (it == zombie_streams_.end()) {
1324 return;
1325 }
1326
1327 closed_streams_.push_back(std::move(it->second));
1328 if (!closed_streams_clean_up_alarm_->IsSet()) {
1329 closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
1330 }
1331 zombie_streams_.erase(it);
1332 // Do not retransmit data of a closed stream.
1333 streams_with_pending_retransmission_.erase(id);
1334}
1335
1336QuicStream* QuicSession::GetStream(QuicStreamId id) const {
1337 if (id <= largest_static_stream_id_) {
1338 auto static_stream = static_stream_map_.find(id);
1339 if (static_stream != static_stream_map_.end()) {
1340 return static_stream->second;
1341 }
1342 }
1343
1344 auto active_stream = dynamic_stream_map_.find(id);
1345 if (active_stream != dynamic_stream_map_.end()) {
1346 return active_stream->second.get();
1347 }
1348 auto zombie_stream = zombie_streams_.find(id);
1349 if (zombie_stream != zombie_streams_.end()) {
1350 return zombie_stream->second.get();
1351 }
1352 return nullptr;
1353}
1354
1355bool QuicSession::OnFrameAcked(const QuicFrame& frame,
1356 QuicTime::Delta ack_delay_time) {
1357 if (frame.type == MESSAGE_FRAME) {
1358 OnMessageAcked(frame.message_frame->message_id);
1359 return true;
1360 }
1361 if (frame.type == CRYPTO_FRAME) {
1362 return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
1363 ack_delay_time);
1364 }
1365 if (frame.type != STREAM_FRAME) {
1366 return control_frame_manager_.OnControlFrameAcked(frame);
1367 }
1368 bool new_stream_data_acked = false;
1369 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1370 // Stream can already be reset when sent frame gets acked.
1371 if (stream != nullptr) {
1372 QuicByteCount newly_acked_length = 0;
1373 new_stream_data_acked = stream->OnStreamFrameAcked(
1374 frame.stream_frame.offset, frame.stream_frame.data_length,
1375 frame.stream_frame.fin, ack_delay_time, &newly_acked_length);
1376 if (!stream->HasPendingRetransmission()) {
1377 streams_with_pending_retransmission_.erase(stream->id());
1378 }
1379 }
1380 return new_stream_data_acked;
1381}
1382
1383void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
1384 QuicStream* stream = GetStream(frame.stream_id);
1385 if (stream == nullptr) {
1386 QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
1387 << " is retransmitted.";
1388 connection()->CloseConnection(
1389 QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
1390 ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
1391 return;
1392 }
1393 stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
1394 frame.fin);
1395}
1396
1397void QuicSession::OnFrameLost(const QuicFrame& frame) {
1398 if (frame.type == MESSAGE_FRAME) {
1399 OnMessageLost(frame.message_frame->message_id);
1400 return;
1401 }
1402 if (frame.type == CRYPTO_FRAME) {
1403 GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
1404 return;
1405 }
1406 if (frame.type != STREAM_FRAME) {
1407 control_frame_manager_.OnControlFrameLost(frame);
1408 return;
1409 }
1410 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1411 if (stream == nullptr) {
1412 return;
1413 }
1414 stream->OnStreamFrameLost(frame.stream_frame.offset,
1415 frame.stream_frame.data_length,
1416 frame.stream_frame.fin);
1417 if (stream->HasPendingRetransmission() &&
1418 !QuicContainsKey(streams_with_pending_retransmission_,
1419 frame.stream_frame.stream_id)) {
1420 streams_with_pending_retransmission_.insert(
1421 std::make_pair(frame.stream_frame.stream_id, true));
1422 }
1423}
1424
1425void QuicSession::RetransmitFrames(const QuicFrames& frames,
1426 TransmissionType type) {
1427 QuicConnection::ScopedPacketFlusher retransmission_flusher(
1428 connection_, QuicConnection::NO_ACK);
1429 SetTransmissionType(type);
1430 for (const QuicFrame& frame : frames) {
1431 if (frame.type == MESSAGE_FRAME) {
1432 // Do not retransmit MESSAGE frames.
1433 continue;
1434 }
1435 if (frame.type == CRYPTO_FRAME) {
1436 GetMutableCryptoStream()->RetransmitData(frame.crypto_frame);
1437 continue;
1438 }
1439 if (frame.type != STREAM_FRAME) {
1440 if (!control_frame_manager_.RetransmitControlFrame(frame)) {
1441 break;
1442 }
1443 continue;
1444 }
1445 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1446 if (stream != nullptr &&
1447 !stream->RetransmitStreamData(frame.stream_frame.offset,
1448 frame.stream_frame.data_length,
1449 frame.stream_frame.fin)) {
1450 break;
1451 }
1452 }
1453}
1454
1455bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
1456 if (frame.type == MESSAGE_FRAME) {
1457 return false;
1458 }
1459 if (frame.type == CRYPTO_FRAME) {
1460 return GetCryptoStream()->IsFrameOutstanding(
1461 frame.crypto_frame->level, frame.crypto_frame->offset,
1462 frame.crypto_frame->data_length);
1463 }
1464 if (frame.type != STREAM_FRAME) {
1465 return control_frame_manager_.IsControlFrameOutstanding(frame);
1466 }
1467 QuicStream* stream = GetStream(frame.stream_frame.stream_id);
1468 return stream != nullptr &&
1469 stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
1470 frame.stream_frame.data_length,
1471 frame.stream_frame.fin);
1472}
1473
1474bool QuicSession::HasUnackedCryptoData() const {
1475 const QuicCryptoStream* crypto_stream = GetCryptoStream();
1476 if (crypto_stream->IsWaitingForAcks()) {
1477 return true;
1478 }
1479 if (GetQuicReloadableFlag(quic_fix_has_pending_crypto_data) &&
1480 crypto_stream->HasBufferedData()) {
1481 QUIC_RELOADABLE_FLAG_COUNT(quic_fix_has_pending_crypto_data);
1482 return true;
1483 }
1484 return false;
1485}
1486
1487WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
1488 QuicStreamOffset offset,
1489 QuicByteCount data_length,
1490 QuicDataWriter* writer) {
1491 QuicStream* stream = GetStream(id);
1492 if (stream == nullptr) {
1493 // This causes the connection to be closed because of failed to serialize
1494 // packet.
1495 QUIC_BUG << "Stream " << id << " does not exist when trying to write data.";
1496 return STREAM_MISSING;
1497 }
1498 if (stream->WriteStreamData(offset, data_length, writer)) {
1499 return WRITE_SUCCESS;
1500 }
1501 return WRITE_FAILED;
1502}
1503
1504bool QuicSession::WriteCryptoData(EncryptionLevel level,
1505 QuicStreamOffset offset,
1506 QuicByteCount data_length,
1507 QuicDataWriter* writer) {
1508 return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
1509 writer);
1510}
1511
1512QuicUint128 QuicSession::GetStatelessResetToken() const {
1513 return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
1514}
1515
1516bool QuicSession::RetransmitLostData() {
1517 QuicConnection::ScopedPacketFlusher retransmission_flusher(
1518 connection_, QuicConnection::SEND_ACK_IF_QUEUED);
1519 // Retransmit crypto data first.
1520 bool uses_crypto_frames = connection_->transport_version() >= QUIC_VERSION_47;
1521 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
1522 if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
1523 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
1524 crypto_stream->WritePendingCryptoRetransmission();
1525 }
1526 // Retransmit crypto data in stream 1 frames (version < 47).
1527 if (!uses_crypto_frames &&
1528 QuicContainsKey(
1529 streams_with_pending_retransmission_,
1530 QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
1531 SetTransmissionType(HANDSHAKE_RETRANSMISSION);
1532 // Retransmit crypto data first.
1533 QuicStream* crypto_stream = GetStream(
1534 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1535 crypto_stream->OnCanWrite();
1536 DCHECK(CheckStreamWriteBlocked(crypto_stream));
1537 if (crypto_stream->HasPendingRetransmission()) {
1538 // Connection is write blocked.
1539 return false;
1540 } else {
1541 streams_with_pending_retransmission_.erase(
1542 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1543 }
1544 }
1545 if (control_frame_manager_.HasPendingRetransmission()) {
1546 SetTransmissionType(LOSS_RETRANSMISSION);
1547 control_frame_manager_.OnCanWrite();
1548 if (control_frame_manager_.HasPendingRetransmission()) {
1549 return false;
1550 }
1551 }
1552 while (!streams_with_pending_retransmission_.empty()) {
1553 if (!connection_->CanWriteStreamData()) {
1554 break;
1555 }
1556 // Retransmit lost data on headers and data streams.
1557 const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
1558 QuicStream* stream = GetStream(id);
1559 if (stream != nullptr) {
1560 SetTransmissionType(LOSS_RETRANSMISSION);
1561 stream->OnCanWrite();
1562 DCHECK(CheckStreamWriteBlocked(stream));
1563 if (stream->HasPendingRetransmission()) {
1564 // Connection is write blocked.
1565 break;
1566 } else if (!streams_with_pending_retransmission_.empty() &&
1567 streams_with_pending_retransmission_.begin()->first == id) {
1568 // Retransmit lost data may cause connection close. If this stream
1569 // has not yet sent fin, a RST_STREAM will be sent and it will be
1570 // removed from streams_with_pending_retransmission_.
1571 streams_with_pending_retransmission_.pop_front();
1572 }
1573 } else {
1574 QUIC_BUG << "Try to retransmit data of a closed stream";
1575 streams_with_pending_retransmission_.pop_front();
1576 }
1577 }
1578
1579 return streams_with_pending_retransmission_.empty();
1580}
1581
1582void QuicSession::NeuterUnencryptedData() {
1583 if (connection_->session_decides_what_to_write()) {
1584 QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
1585 crypto_stream->NeuterUnencryptedStreamData();
1586 if (!crypto_stream->HasPendingRetransmission()) {
1587 streams_with_pending_retransmission_.erase(
1588 QuicUtils::GetCryptoStreamId(connection_->transport_version()));
1589 }
1590 }
1591 connection_->NeuterUnencryptedPackets();
1592}
1593
1594void QuicSession::SetTransmissionType(TransmissionType type) {
1595 connection_->SetTransmissionType(type);
1596}
1597
1598MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
1599 if (!IsEncryptionEstablished()) {
1600 return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
1601 }
1602 MessageStatus result =
1603 connection_->SendMessage(last_message_id_ + 1, message);
1604 if (result == MESSAGE_STATUS_SUCCESS) {
1605 return {result, ++last_message_id_};
1606 }
1607 return {result, 0};
1608}
1609
1610void QuicSession::OnMessageAcked(QuicMessageId message_id) {
1611 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
1612}
1613
1614void QuicSession::OnMessageLost(QuicMessageId message_id) {
1615 QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
1616 << " is considered lost";
1617}
1618
1619void QuicSession::CleanUpClosedStreams() {
1620 closed_streams_.clear();
1621}
1622
1623bool QuicSession::session_decides_what_to_write() const {
1624 return connection_->session_decides_what_to_write();
1625}
1626
1627QuicPacketLength QuicSession::GetLargestMessagePayload() const {
1628 return connection_->GetLargestMessagePayload();
1629}
1630
1631void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
1632 control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
1633}
1634
1635void QuicSession::OnCanCreateNewOutgoingStream() {}
1636
1637QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
1638 if (connection_->transport_version() == QUIC_VERSION_99) {
1639 return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
1640 }
1641 return stream_id_manager_.next_outgoing_stream_id();
1642}
1643
1644QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
1645 if (connection_->transport_version() == QUIC_VERSION_99) {
1646 return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
1647 }
1648 return stream_id_manager_.next_outgoing_stream_id();
1649}
1650
1651bool QuicSession::OnMaxStreamIdFrame(const QuicMaxStreamIdFrame& frame) {
1652 return v99_streamid_manager_.OnMaxStreamIdFrame(frame);
1653}
1654
1655bool QuicSession::OnStreamIdBlockedFrame(
1656 const QuicStreamIdBlockedFrame& frame) {
1657 return v99_streamid_manager_.OnStreamIdBlockedFrame(frame);
1658}
1659
1660size_t QuicSession::max_open_incoming_bidirectional_streams() const {
1661 if (connection_->transport_version() == QUIC_VERSION_99) {
1662 return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
1663 }
1664 return stream_id_manager_.max_open_incoming_streams();
1665}
1666
1667size_t QuicSession::max_open_incoming_unidirectional_streams() const {
1668 if (connection_->transport_version() == QUIC_VERSION_99) {
1669 return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
1670 }
1671 return stream_id_manager_.max_open_incoming_streams();
1672}
1673
1674#undef ENDPOINT // undef for jumbo builds
1675} // namespace quic