Project import generated by Copybara.
PiperOrigin-RevId: 237361882
Change-Id: I109a68f44db867b20f8c6a7732b0ce657133e52a
diff --git a/quic/core/quic_session.cc b/quic/core/quic_session.cc
new file mode 100644
index 0000000..09e3865
--- /dev/null
+++ b/quic/core/quic_session.cc
@@ -0,0 +1,1675 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/core/quic_session.h"
+
+#include <cstdint>
+#include <utility>
+
+#include "net/third_party/quiche/src/quic/core/quic_connection.h"
+#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
+#include "net/third_party/quiche/src/quic/core/quic_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_str_cat.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string.h"
+
+using spdy::SpdyPriority;
+
+namespace quic {
+
+namespace {
+
+class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
+ public:
+ explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
+ : session_(session) {}
+ ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
+ ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
+ delete;
+
+ void OnAlarm() override { session_->CleanUpClosedStreams(); }
+
+ private:
+ QuicSession* session_;
+};
+
+} // namespace
+
+#define ENDPOINT \
+ (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")
+
+QuicSession::QuicSession(QuicConnection* connection,
+ Visitor* owner,
+ const QuicConfig& config,
+ const ParsedQuicVersionVector& supported_versions)
+ : connection_(connection),
+ visitor_(owner),
+ write_blocked_streams_(),
+ config_(config),
+ stream_id_manager_(this,
+ kDefaultMaxStreamsPerConnection,
+ config_.GetMaxIncomingDynamicStreamsToSend()),
+ v99_streamid_manager_(this,
+ kDefaultMaxStreamsPerConnection,
+ config_.GetMaxIncomingDynamicStreamsToSend()),
+ num_dynamic_incoming_streams_(0),
+ num_draining_incoming_streams_(0),
+ num_locally_closed_incoming_streams_highest_offset_(0),
+ error_(QUIC_NO_ERROR),
+ flow_controller_(
+ this,
+ QuicUtils::GetInvalidStreamId(connection->transport_version()),
+ /*is_connection_flow_controller*/ true,
+ kMinimumFlowControlSendWindow,
+ config_.GetInitialSessionFlowControlWindowToSend(),
+ kSessionReceiveWindowLimit,
+ perspective() == Perspective::IS_SERVER,
+ nullptr),
+ currently_writing_stream_id_(0),
+ largest_static_stream_id_(0),
+ is_handshake_confirmed_(false),
+ goaway_sent_(false),
+ goaway_received_(false),
+ control_frame_manager_(this),
+ last_message_id_(0),
+ closed_streams_clean_up_alarm_(nullptr),
+ supported_versions_(supported_versions) {
+ closed_streams_clean_up_alarm_ =
+ QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
+ new ClosedStreamsCleanUpDelegate(this)));
+}
+
+void QuicSession::Initialize() {
+ connection_->set_visitor(this);
+ connection_->SetSessionNotifier(this);
+ connection_->SetDataProducer(this);
+ connection_->SetFromConfig(config_);
+
+ DCHECK_EQ(QuicUtils::GetCryptoStreamId(connection_->transport_version()),
+ GetMutableCryptoStream()->id());
+ RegisterStaticStream(
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()),
+ GetMutableCryptoStream());
+}
+
+QuicSession::~QuicSession() {
+ QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
+}
+
+void QuicSession::RegisterStaticStream(QuicStreamId id, QuicStream* stream) {
+ static_stream_map_[id] = stream;
+
+ QUIC_BUG_IF(id >
+ largest_static_stream_id_ +
+ QuicUtils::StreamIdDelta(connection_->transport_version()))
+ << ENDPOINT << "Static stream registered out of order: " << id
+ << " vs: " << largest_static_stream_id_;
+ largest_static_stream_id_ = std::max(id, largest_static_stream_id_);
+
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.RegisterStaticStream(id);
+ }
+}
+
+void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
+ // TODO(rch) deal with the error case of stream id 0.
+ QuicStreamId stream_id = frame.stream_id;
+ if (stream_id ==
+ QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+
+ if (frame.fin && QuicContainsKey(static_stream_map_, stream_id)) {
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Attempt to close a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+
+ StreamHandler handler = GetOrCreateStreamImpl(stream_id, frame.offset != 0);
+ if (handler.is_pending) {
+ handler.pending->OnStreamFrame(frame);
+ return;
+ }
+
+ if (!handler.stream) {
+ // The stream no longer exists, but we may still be interested in the
+ // final stream byte offset sent by the peer. A frame with a FIN can give
+ // us this offset.
+ if (frame.fin) {
+ QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
+ OnFinalByteOffsetReceived(stream_id, final_byte_offset);
+ }
+ return;
+ }
+ handler.stream->OnStreamFrame(frame);
+}
+
+void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
+ GetMutableCryptoStream()->OnCryptoFrame(frame);
+}
+
+bool QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
+ // We are not version 99. In theory, if not in version 99 then the framer
+ // could not call OnStopSending... This is just a check that is good when
+ // both a new protocol and a new implementation of that protocol are both
+ // being developed.
+ DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
+
+ QuicStreamId stream_id = frame.stream_id;
+ // If Stream ID is invalid then close the connection.
+ if (stream_id ==
+ QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received STOP_SENDING with invalid stream_id: "
+ << stream_id << " Closing connection";
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return false;
+ }
+
+ // Ignore STOP_SENDING for static streams.
+ // TODO(fkastenholz): IETF Quic does not have static streams and does not
+ // make exceptions for them with respect to processing things like
+ // STOP_SENDING.
+ if (QuicContainsKey(static_stream_map_, stream_id)) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received STOP_SENDING for a static stream, id: "
+ << stream_id << " Closing connection";
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return false;
+ }
+
+ if (visitor_) {
+ visitor_->OnStopSendingReceived(frame);
+ }
+
+ // If stream is closed, ignore the frame
+ if (IsClosedStream(stream_id)) {
+ QUIC_DVLOG(1)
+ << ENDPOINT
+ << "Received STOP_SENDING for closed or non-existent stream, id: "
+ << stream_id << " Ignoring.";
+ return true; // Continue processing the packet.
+ }
+ // If stream is non-existent, close the connection
+ DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
+ if (it == dynamic_stream_map_.end()) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received STOP_SENDING for non-existent stream, id: "
+ << stream_id << " Closing connection";
+ connection()->CloseConnection(
+ IETF_QUIC_PROTOCOL_VIOLATION,
+ "Received STOP_SENDING for a non-existent stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return false;
+ }
+
+ // Get the QuicStream for this stream. Ignore the STOP_SENDING
+ // if the QuicStream pointer is NULL
+ // QUESTION: IS THIS THE RIGHT THING TO DO? (that is, this would happen IFF
+ // there was an entry in the map, but the pointer is null. sounds more like a
+ // deep programming error rather than a simple protocol problem).
+ QuicStream* stream = it->second.get();
+ if (stream == nullptr) {
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Received STOP_SENDING for NULL QuicStream, stream_id: "
+ << stream_id << ". Ignoring.";
+ return true;
+ }
+ stream->OnStopSending(frame.application_error_code);
+
+ stream->set_stream_error(
+ static_cast<QuicRstStreamErrorCode>(frame.application_error_code));
+ SendRstStreamInner(
+ stream->id(),
+ static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
+ stream->stream_bytes_written(),
+ /*close_write_side_only=*/true);
+
+ return true;
+}
+
+void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
+ QuicStreamId stream_id = frame.stream_id;
+ if (stream_id ==
+ QuicUtils::GetInvalidStreamId(connection()->transport_version())) {
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Recevied data for an invalid stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+
+ if (QuicContainsKey(static_stream_map_, stream_id)) {
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Attempt to reset a static stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+
+ if (visitor_) {
+ visitor_->OnRstStreamReceived(frame);
+ }
+
+ // may_buffer is true here to allow subclasses to buffer streams until the
+ // first byte of payload arrives which would allow sessions to delay
+ // creation of the stream until the type is known.
+ StreamHandler handler = GetOrCreateStreamImpl(stream_id, /*may_buffer=*/true);
+ if (handler.is_pending) {
+ handler.pending->OnRstStreamFrame(frame);
+ ClosePendingStream(stream_id);
+ return;
+ }
+ if (!handler.stream) {
+ HandleRstOnValidNonexistentStream(frame);
+ return; // Errors are handled by GetOrCreateStream.
+ }
+ handler.stream->OnStreamReset(frame);
+}
+
+void QuicSession::OnGoAway(const QuicGoAwayFrame& frame) {
+ goaway_received_ = true;
+}
+
+void QuicSession::OnMessageReceived(QuicStringPiece message) {
+ QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
+ << ", " << message;
+}
+
+void QuicSession::OnConnectionClosed(QuicErrorCode error,
+ const QuicString& error_details,
+ ConnectionCloseSource source) {
+ DCHECK(!connection_->connected());
+ if (error_ == QUIC_NO_ERROR) {
+ error_ = error;
+ }
+
+ while (!dynamic_stream_map_.empty()) {
+ DynamicStreamMap::iterator it = dynamic_stream_map_.begin();
+ QuicStreamId id = it->first;
+ it->second->OnConnectionClosed(error, source);
+ // The stream should call CloseStream as part of OnConnectionClosed.
+ if (dynamic_stream_map_.find(id) != dynamic_stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream failed to close under OnConnectionClosed";
+ CloseStream(id);
+ }
+ }
+
+ // Cleanup zombie stream map on connection close.
+ while (!zombie_streams_.empty()) {
+ ZombieStreamMap::iterator it = zombie_streams_.begin();
+ closed_streams_.push_back(std::move(it->second));
+ zombie_streams_.erase(it);
+ }
+
+ closed_streams_clean_up_alarm_->Cancel();
+
+ if (visitor_) {
+ visitor_->OnConnectionClosed(connection_->connection_id(), error,
+ error_details, source);
+ }
+}
+
+void QuicSession::OnWriteBlocked() {
+ if (GetQuicReloadableFlag(
+ quic_connection_do_not_add_to_write_blocked_list_if_disconnected) &&
+ !connection_->connected()) {
+ QUIC_RELOADABLE_FLAG_COUNT_N(
+ quic_connection_do_not_add_to_write_blocked_list_if_disconnected, 1, 2);
+ return;
+ }
+ if (visitor_) {
+ visitor_->OnWriteBlocked(connection_);
+ }
+}
+
+void QuicSession::OnSuccessfulVersionNegotiation(
+ const ParsedQuicVersion& version) {
+ GetMutableCryptoStream()->OnSuccessfulVersionNegotiation(version);
+}
+
+void QuicSession::OnConnectivityProbeReceived(
+ const QuicSocketAddress& self_address,
+ const QuicSocketAddress& peer_address) {
+ if (perspective() == Perspective::IS_SERVER) {
+ // Server only sends back a connectivity probe after received a
+ // connectivity probe from a new peer address.
+ connection_->SendConnectivityProbingResponsePacket(peer_address);
+ }
+}
+
+void QuicSession::OnPathDegrading() {}
+
+bool QuicSession::AllowSelfAddressChange() const {
+ return false;
+}
+
+void QuicSession::OnForwardProgressConfirmed() {}
+
+void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
+ // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
+ // assume that it still exists.
+ QuicStreamId stream_id = frame.stream_id;
+ if (stream_id ==
+ QuicUtils::GetInvalidStreamId(connection_->transport_version())) {
+ // This is a window update that applies to the connection, rather than an
+ // individual stream.
+ QUIC_DLOG(INFO) << ENDPOINT
+ << "Received connection level flow control window "
+ "update with byte offset: "
+ << frame.byte_offset;
+ flow_controller_.UpdateSendWindowOffset(frame.byte_offset);
+ return;
+ }
+ QuicStream* stream = GetOrCreateStream(stream_id);
+ if (stream != nullptr) {
+ stream->OnWindowUpdateFrame(frame);
+ }
+}
+
+void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
+ // TODO(rjshade): Compare our flow control receive windows for specified
+ // streams: if we have a large window then maybe something
+ // had gone wrong with the flow control accounting.
+ QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
+ << frame.stream_id;
+}
+
+bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
+ uint64_t previous_bytes_written,
+ bool previous_fin_sent) {
+ if ( // Stream should not be closed.
+ !stream->write_side_closed() &&
+ // Not connection flow control blocked.
+ !flow_controller_.IsBlocked() &&
+ // Detect lack of forward progress.
+ previous_bytes_written == stream->stream_bytes_written() &&
+ previous_fin_sent == stream->fin_sent()) {
+ stream->set_busy_counter(stream->busy_counter() + 1);
+ QUIC_DVLOG(1) << "Suspected busy loop on stream id " << stream->id()
+ << " stream_bytes_written " << stream->stream_bytes_written()
+ << " fin " << stream->fin_sent() << " count "
+ << stream->busy_counter();
+ // Wait a few iterations before firing, the exact count is
+ // arbitrary, more than a few to cover a few test-only false
+ // positives.
+ if (stream->busy_counter() > 20) {
+ QUIC_LOG(ERROR) << "Detected busy loop on stream id " << stream->id()
+ << " stream_bytes_written "
+ << stream->stream_bytes_written() << " fin "
+ << stream->fin_sent();
+ return false;
+ }
+ } else {
+ stream->set_busy_counter(0);
+ }
+ return true;
+}
+
+bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
+ if (!stream->write_side_closed() && stream->HasBufferedData() &&
+ !stream->flow_controller()->IsBlocked() &&
+ !write_blocked_streams_.IsStreamBlocked(stream->id())) {
+ QUIC_DLOG(ERROR) << "stream " << stream->id() << " has buffered "
+ << stream->BufferedDataBytes()
+ << " bytes, and is not flow control blocked, "
+ "but it is not in the write block list.";
+ return false;
+ }
+ return true;
+}
+
+void QuicSession::OnCanWrite() {
+ if (!RetransmitLostData()) {
+ // Cannot finish retransmitting lost data, connection is write blocked.
+ QUIC_DVLOG(1) << ENDPOINT
+ << "Cannot finish retransmitting lost data, connection is "
+ "write blocked.";
+ return;
+ }
+ if (session_decides_what_to_write()) {
+ SetTransmissionType(NOT_RETRANSMISSION);
+ }
+ // We limit the number of writes to the number of pending streams. If more
+ // streams become pending, WillingAndAbleToWrite will be true, which will
+ // cause the connection to request resumption before yielding to other
+ // connections.
+ // If we are connection level flow control blocked, then only allow the
+ // crypto and headers streams to try writing as all other streams will be
+ // blocked.
+ size_t num_writes = flow_controller_.IsBlocked()
+ ? write_blocked_streams_.NumBlockedSpecialStreams()
+ : write_blocked_streams_.NumBlockedStreams();
+ if (num_writes == 0 && !control_frame_manager_.WillingToWrite()) {
+ return;
+ }
+
+ QuicConnection::ScopedPacketFlusher flusher(
+ connection_, QuicConnection::SEND_ACK_IF_QUEUED);
+ if (control_frame_manager_.WillingToWrite()) {
+ control_frame_manager_.OnCanWrite();
+ }
+ for (size_t i = 0; i < num_writes; ++i) {
+ if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
+ write_blocked_streams_.HasWriteBlockedDataStreams())) {
+ // Writing one stream removed another!? Something's broken.
+ QUIC_BUG << "WriteBlockedStream is missing";
+ connection_->CloseConnection(QUIC_INTERNAL_ERROR,
+ "WriteBlockedStream is missing",
+ ConnectionCloseBehavior::SILENT_CLOSE);
+ return;
+ }
+ if (!connection_->CanWriteStreamData()) {
+ return;
+ }
+ currently_writing_stream_id_ = write_blocked_streams_.PopFront();
+ QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
+ if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
+ // If the stream can't write all bytes it'll re-add itself to the blocked
+ // list.
+ uint64_t previous_bytes_written = stream->stream_bytes_written();
+ bool previous_fin_sent = stream->fin_sent();
+ QUIC_DVLOG(1) << "stream " << stream->id() << " bytes_written "
+ << previous_bytes_written << " fin " << previous_fin_sent;
+ stream->OnCanWrite();
+ DCHECK(CheckStreamWriteBlocked(stream));
+ DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
+ previous_fin_sent));
+ }
+ currently_writing_stream_id_ = 0;
+ }
+}
+
+bool QuicSession::WillingAndAbleToWrite() const {
+ // Schedule a write when:
+ // 1) control frame manager has pending or new control frames, or
+ // 2) any stream has pending retransmissions, or
+ // 3) If the crypto or headers streams are blocked, or
+ // 4) connection is not flow control blocked and there are write blocked
+ // streams.
+ return control_frame_manager_.WillingToWrite() ||
+ !streams_with_pending_retransmission_.empty() ||
+ write_blocked_streams_.HasWriteBlockedSpecialStream() ||
+ (!flow_controller_.IsBlocked() &&
+ write_blocked_streams_.HasWriteBlockedDataStreams());
+}
+
+bool QuicSession::HasPendingHandshake() const {
+ return QuicContainsKey(
+ streams_with_pending_retransmission_,
+ QuicUtils::GetCryptoStreamId(connection_->transport_version())) ||
+ write_blocked_streams_.IsStreamBlocked(
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()));
+}
+
+uint64_t QuicSession::GetNumOpenDynamicStreams() const {
+ return dynamic_stream_map_.size() - draining_streams_.size() +
+ locally_closed_streams_highest_offset_.size();
+}
+
+void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
+ const QuicSocketAddress& peer_address,
+ const QuicReceivedPacket& packet) {
+ connection_->ProcessUdpPacket(self_address, peer_address, packet);
+}
+
+QuicConsumedData QuicSession::WritevData(QuicStream* stream,
+ QuicStreamId id,
+ size_t write_length,
+ QuicStreamOffset offset,
+ StreamSendingState state) {
+ // This check is an attempt to deal with potential memory corruption
+ // in which |id| ends up set to 1 (the crypto stream id). If this happen
+ // it might end up resulting in unencrypted stream data being sent.
+ // While this is impossible to avoid given sufficient corruption, this
+ // seems like a reasonable mitigation.
+ if (id == QuicUtils::GetCryptoStreamId(connection_->transport_version()) &&
+ stream != GetMutableCryptoStream()) {
+ QUIC_BUG << "Stream id mismatch";
+ connection_->CloseConnection(
+ QUIC_INTERNAL_ERROR,
+ "Non-crypto stream attempted to write data as crypto stream.",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return QuicConsumedData(0, false);
+ }
+ if (!IsEncryptionEstablished() &&
+ id != QuicUtils::GetCryptoStreamId(connection_->transport_version())) {
+ // Do not let streams write without encryption. The calling stream will end
+ // up write blocked until OnCanWrite is next called.
+ return QuicConsumedData(0, false);
+ }
+
+ QuicConsumedData data =
+ connection_->SendStreamData(id, write_length, offset, state);
+ if (offset >= stream->stream_bytes_written()) {
+ // This is new stream data.
+ write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
+ }
+ return data;
+}
+
+bool QuicSession::WriteControlFrame(const QuicFrame& frame) {
+ return connection_->SendControlFrame(frame);
+}
+
+void QuicSession::SendRstStream(QuicStreamId id,
+ QuicRstStreamErrorCode error,
+ QuicStreamOffset bytes_written) {
+ SendRstStreamInner(id, error, bytes_written, /*close_write_side_only=*/false);
+}
+
+void QuicSession::SendRstStreamInner(QuicStreamId id,
+ QuicRstStreamErrorCode error,
+ QuicStreamOffset bytes_written,
+ bool close_write_side_only) {
+ if (connection()->connected()) {
+ // Only send if still connected.
+ if (close_write_side_only) {
+ DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
+ // Send a RST_STREAM frame.
+ control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
+ } else {
+ // Send a RST_STREAM frame plus, if version 99, an IETF
+ // QUIC STOP_SENDING frame. Both sre sent to emulate
+ // the two-way close that Google QUIC's RST_STREAM does.
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ QuicConnection::ScopedPacketFlusher flusher(
+ connection(), QuicConnection::SEND_ACK_IF_QUEUED);
+ control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
+ control_frame_manager_.WriteOrBufferStopSending(error, id);
+ } else {
+ control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
+ }
+ }
+ connection_->OnStreamReset(id, error);
+ }
+ if (error != QUIC_STREAM_NO_ERROR && QuicContainsKey(zombie_streams_, id)) {
+ OnStreamDoneWaitingForAcks(id);
+ return;
+ }
+
+ if (!close_write_side_only) {
+ CloseStreamInner(id, true);
+ return;
+ }
+ DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
+
+ DynamicStreamMap::iterator it = dynamic_stream_map_.find(id);
+ if (it != dynamic_stream_map_.end()) {
+ QuicStream* stream = it->second.get();
+ if (stream) {
+ stream->set_rst_sent(true);
+ stream->CloseWriteSide();
+ }
+ }
+}
+
+void QuicSession::SendGoAway(QuicErrorCode error_code,
+ const QuicString& reason) {
+ // GOAWAY frame is not supported in v99.
+ DCHECK_NE(QUIC_VERSION_99, connection_->transport_version());
+ if (goaway_sent_) {
+ return;
+ }
+ goaway_sent_ = true;
+ control_frame_manager_.WriteOrBufferGoAway(
+ error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
+}
+
+void QuicSession::SendBlocked(QuicStreamId id) {
+ control_frame_manager_.WriteOrBufferBlocked(id);
+}
+
+void QuicSession::SendWindowUpdate(QuicStreamId id,
+ QuicStreamOffset byte_offset) {
+ control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
+}
+
+void QuicSession::SendMaxStreamId(QuicStreamId max_allowed_incoming_id) {
+ control_frame_manager_.WriteOrBufferMaxStreamId(max_allowed_incoming_id);
+}
+
+void QuicSession::SendStreamIdBlocked(QuicStreamId max_allowed_outgoing_id) {
+ control_frame_manager_.WriteOrBufferStreamIdBlocked(max_allowed_outgoing_id);
+}
+
+void QuicSession::CloseStream(QuicStreamId stream_id) {
+ CloseStreamInner(stream_id, false);
+}
+
+void QuicSession::InsertLocallyClosedStreamsHighestOffset(
+ const QuicStreamId id,
+ QuicStreamOffset offset) {
+ locally_closed_streams_highest_offset_[id] = offset;
+ if (IsIncomingStream(id)) {
+ ++num_locally_closed_incoming_streams_highest_offset_;
+ }
+}
+
+void QuicSession::CloseStreamInner(QuicStreamId stream_id, bool locally_reset) {
+ QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
+
+ DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
+ if (it == dynamic_stream_map_.end()) {
+ // When CloseStreamInner has been called recursively (via
+ // QuicStream::OnClose), the stream will already have been deleted
+ // from stream_map_, so return immediately.
+ QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
+ return;
+ }
+ QuicStream* stream = it->second.get();
+
+ // Tell the stream that a RST has been sent.
+ if (locally_reset) {
+ stream->set_rst_sent(true);
+ }
+
+ if (stream->IsWaitingForAcks()) {
+ zombie_streams_[stream->id()] = std::move(it->second);
+ } else {
+ closed_streams_.push_back(std::move(it->second));
+ // Do not retransmit data of a closed stream.
+ streams_with_pending_retransmission_.erase(stream_id);
+ if (!closed_streams_clean_up_alarm_->IsSet()) {
+ closed_streams_clean_up_alarm_->Set(
+ connection_->clock()->ApproximateNow());
+ }
+ }
+
+ // If we haven't received a FIN or RST for this stream, we need to keep track
+ // of the how many bytes the stream's flow controller believes it has
+ // received, for accurate connection level flow control accounting.
+ const bool had_fin_or_rst = stream->HasFinalReceivedByteOffset();
+ if (!had_fin_or_rst) {
+ InsertLocallyClosedStreamsHighestOffset(
+ stream_id, stream->flow_controller()->highest_received_byte_offset());
+ }
+ dynamic_stream_map_.erase(it);
+ if (IsIncomingStream(stream_id)) {
+ --num_dynamic_incoming_streams_;
+ }
+
+ const bool stream_was_draining =
+ draining_streams_.find(stream_id) != draining_streams_.end();
+ if (stream_was_draining) {
+ if (IsIncomingStream(stream_id)) {
+ --num_draining_incoming_streams_;
+ }
+ draining_streams_.erase(stream_id);
+ } else if (connection_->transport_version() == QUIC_VERSION_99) {
+ // Stream was not draining, but we did have a fin or rst, so we can now
+ // free the stream ID if version 99.
+ if (had_fin_or_rst) {
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+ }
+
+ stream->OnClose();
+
+ if (!stream_was_draining && !IsIncomingStream(stream_id) && had_fin_or_rst &&
+ connection_->transport_version() != QUIC_VERSION_99) {
+ // Streams that first became draining already called OnCanCreate...
+ // This covers the case where the stream went directly to being closed.
+ OnCanCreateNewOutgoingStream();
+ }
+}
+
+void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
+ QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
+
+ if (pending_stream_map_.find(stream_id) == pending_stream_map_.end()) {
+ QUIC_BUG << ENDPOINT << "Stream is already closed: " << stream_id;
+ return;
+ }
+
+ SendRstStream(stream_id, QUIC_RST_ACKNOWLEDGEMENT, 0);
+
+ // The pending stream may have been deleted and removed during SendRstStream.
+ // Remove the stream from pending stream map iff it is still in the map.
+ if (pending_stream_map_.find(stream_id) != pending_stream_map_.end()) {
+ pending_stream_map_.erase(stream_id);
+ }
+
+ --num_dynamic_incoming_streams_;
+
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+
+ OnCanCreateNewOutgoingStream();
+}
+
+void QuicSession::OnFinalByteOffsetReceived(
+ QuicStreamId stream_id,
+ QuicStreamOffset final_byte_offset) {
+ auto it = locally_closed_streams_highest_offset_.find(stream_id);
+ if (it == locally_closed_streams_highest_offset_.end()) {
+ return;
+ }
+
+ QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
+ << final_byte_offset << " for stream " << stream_id;
+ QuicByteCount offset_diff = final_byte_offset - it->second;
+ if (flow_controller_.UpdateHighestReceivedOffset(
+ flow_controller_.highest_received_byte_offset() + offset_diff)) {
+ // If the final offset violates flow control, close the connection now.
+ if (flow_controller_.FlowControlViolation()) {
+ connection_->CloseConnection(
+ QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
+ "Connection level flow control violation",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+ }
+
+ flow_controller_.AddBytesConsumed(offset_diff);
+ locally_closed_streams_highest_offset_.erase(it);
+ if (IsIncomingStream(stream_id)) {
+ --num_locally_closed_incoming_streams_highest_offset_;
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+ } else if (connection_->transport_version() != QUIC_VERSION_99) {
+ OnCanCreateNewOutgoingStream();
+ }
+}
+
+bool QuicSession::IsEncryptionEstablished() const {
+ // Once the handshake is confirmed, it never becomes un-confirmed.
+ if (is_handshake_confirmed_) {
+ return true;
+ }
+ return GetCryptoStream()->encryption_established();
+}
+
+bool QuicSession::IsCryptoHandshakeConfirmed() const {
+ return GetCryptoStream()->handshake_confirmed();
+}
+
+void QuicSession::OnConfigNegotiated() {
+ connection_->SetFromConfig(config_);
+
+ uint32_t max_streams = 0;
+ if (config_.HasReceivedMaxIncomingDynamicStreams()) {
+ max_streams = config_.ReceivedMaxIncomingDynamicStreams();
+ }
+ QUIC_DVLOG(1) << "Setting max_open_outgoing_streams_ to " << max_streams;
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.SetMaxOpenOutgoingStreams(max_streams);
+ } else {
+ stream_id_manager_.set_max_open_outgoing_streams(max_streams);
+ }
+ if (perspective() == Perspective::IS_SERVER) {
+ if (config_.HasReceivedConnectionOptions()) {
+ // The following variations change the initial receive flow control
+ // window sizes.
+ if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
+ AdjustInitialFlowControlWindows(64 * 1024);
+ }
+ if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
+ AdjustInitialFlowControlWindows(128 * 1024);
+ }
+ if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
+ AdjustInitialFlowControlWindows(256 * 1024);
+ }
+ if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
+ AdjustInitialFlowControlWindows(512 * 1024);
+ }
+ if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
+ AdjustInitialFlowControlWindows(1024 * 1024);
+ }
+ }
+
+ config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
+ }
+
+ // A small number of additional incoming streams beyond the limit should be
+ // allowed. This helps avoid early connection termination when FIN/RSTs for
+ // old streams are lost or arrive out of order.
+ // Use a minimum number of additional streams, or a percentage increase,
+ // whichever is larger.
+ uint32_t max_incoming_streams_to_send =
+ config_.GetMaxIncomingDynamicStreamsToSend();
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.SetMaxOpenIncomingStreams(
+ max_incoming_streams_to_send);
+ } else {
+ uint32_t max_incoming_streams =
+ std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
+ static_cast<uint32_t>(max_incoming_streams_to_send *
+ kMaxStreamsMultiplier));
+ stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
+ }
+
+ if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
+ // Streams which were created before the SHLO was received (0-RTT
+ // requests) are now informed of the peer's initial flow control window.
+ OnNewStreamFlowControlWindow(
+ config_.ReceivedInitialStreamFlowControlWindowBytes());
+ }
+ if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
+ OnNewSessionFlowControlWindow(
+ config_.ReceivedInitialSessionFlowControlWindowBytes());
+ }
+}
+
+void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
+ const float session_window_multiplier =
+ config_.GetInitialStreamFlowControlWindowToSend()
+ ? static_cast<float>(
+ config_.GetInitialSessionFlowControlWindowToSend()) /
+ config_.GetInitialStreamFlowControlWindowToSend()
+ : 1.5;
+
+ QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
+ config_.SetInitialStreamFlowControlWindowToSend(stream_window);
+
+ size_t session_window = session_window_multiplier * stream_window;
+ QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
+ << session_window;
+ config_.SetInitialSessionFlowControlWindowToSend(session_window);
+ flow_controller_.UpdateReceiveWindowSize(session_window);
+ // Inform all existing streams about the new window.
+ for (auto const& kv : static_stream_map_) {
+ kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
+ }
+ for (auto const& kv : dynamic_stream_map_) {
+ kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
+ }
+}
+
+void QuicSession::HandleFrameOnNonexistentOutgoingStream(
+ QuicStreamId stream_id) {
+ DCHECK(!IsClosedStream(stream_id));
+ // Received a frame for a locally-created stream that is not currently
+ // active. This is an error.
+ connection()->CloseConnection(
+ QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+}
+
+void QuicSession::HandleRstOnValidNonexistentStream(
+ const QuicRstStreamFrame& frame) {
+ // If the stream is neither originally in active streams nor created in
+ // GetOrCreateDynamicStream(), it could be a closed stream in which case its
+ // final received byte offset need to be updated.
+ if (IsClosedStream(frame.stream_id)) {
+ // The RST frame contains the final byte offset for the stream: we can now
+ // update the connection level flow controller if needed.
+ OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
+ }
+}
+
+void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
+ if (new_window < kMinimumFlowControlSendWindow) {
+ QUIC_LOG_FIRST_N(ERROR, 1)
+ << "Peer sent us an invalid stream flow control send window: "
+ << new_window << ", below default: " << kMinimumFlowControlSendWindow;
+ if (connection_->connected()) {
+ connection_->CloseConnection(
+ QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ }
+ return;
+ }
+
+ // Inform all existing streams about the new window.
+ for (auto const& kv : static_stream_map_) {
+ kv.second->UpdateSendWindowOffset(new_window);
+ }
+ for (auto const& kv : dynamic_stream_map_) {
+ kv.second->UpdateSendWindowOffset(new_window);
+ }
+}
+
+void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
+ if (new_window < kMinimumFlowControlSendWindow) {
+ QUIC_LOG_FIRST_N(ERROR, 1)
+ << "Peer sent us an invalid session flow control send window: "
+ << new_window << ", below default: " << kMinimumFlowControlSendWindow;
+ if (connection_->connected()) {
+ connection_->CloseConnection(
+ QUIC_FLOW_CONTROL_INVALID_WINDOW, "New connection window too low",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ }
+ return;
+ }
+
+ flow_controller_.UpdateSendWindowOffset(new_window);
+}
+
+void QuicSession::OnCryptoHandshakeEvent(CryptoHandshakeEvent event) {
+ switch (event) {
+ // TODO(satyamshekhar): Move the logic of setting the encrypter/decrypter
+ // to QuicSession since it is the glue.
+ case ENCRYPTION_FIRST_ESTABLISHED:
+ // Given any streams blocked by encryption a chance to write.
+ OnCanWrite();
+ break;
+
+ case ENCRYPTION_REESTABLISHED:
+ // Retransmit originally packets that were sent, since they can't be
+ // decrypted by the peer.
+ connection_->RetransmitUnackedPackets(ALL_INITIAL_RETRANSMISSION);
+ // Given any streams blocked by encryption a chance to write.
+ OnCanWrite();
+ break;
+
+ case HANDSHAKE_CONFIRMED:
+ QUIC_BUG_IF(!config_.negotiated())
+ << ENDPOINT << "Handshake confirmed without parameter negotiation.";
+ // Discard originally encrypted packets, since they can't be decrypted by
+ // the peer.
+ NeuterUnencryptedData();
+ is_handshake_confirmed_ = true;
+ break;
+
+ default:
+ QUIC_LOG(ERROR) << ENDPOINT << "Got unknown handshake event: " << event;
+ }
+}
+
+void QuicSession::OnCryptoHandshakeMessageSent(
+ const CryptoHandshakeMessage& /*message*/) {}
+
+void QuicSession::OnCryptoHandshakeMessageReceived(
+ const CryptoHandshakeMessage& /*message*/) {}
+
+void QuicSession::RegisterStreamPriority(QuicStreamId id,
+ bool is_static,
+ SpdyPriority priority) {
+ write_blocked_streams()->RegisterStream(id, is_static, priority);
+}
+
+void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
+ write_blocked_streams()->UnregisterStream(id, is_static);
+}
+
+void QuicSession::UpdateStreamPriority(QuicStreamId id,
+ SpdyPriority new_priority) {
+ write_blocked_streams()->UpdateStreamPriority(id, new_priority);
+}
+
+QuicConfig* QuicSession::config() {
+ return &config_;
+}
+
+void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
+ QuicStreamId stream_id = stream->id();
+ QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << dynamic_stream_map_.size()
+ << ". activating " << stream_id;
+ DCHECK(!QuicContainsKey(dynamic_stream_map_, stream_id));
+ DCHECK(!QuicContainsKey(static_stream_map_, stream_id));
+ dynamic_stream_map_[stream_id] = std::move(stream);
+ if (IsIncomingStream(stream_id)) {
+ ++num_dynamic_incoming_streams_;
+ }
+}
+
+QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
+ }
+ return stream_id_manager_.GetNextOutgoingStreamId();
+}
+
+QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
+ }
+ return stream_id_manager_.GetNextOutgoingStreamId();
+}
+
+bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream();
+ }
+ return stream_id_manager_.CanOpenNextOutgoingStream(
+ GetNumOpenOutgoingStreams());
+}
+
+bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream();
+ }
+ return stream_id_manager_.CanOpenNextOutgoingStream(
+ GetNumOpenOutgoingStreams());
+}
+
+QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
+ StreamHandler handler =
+ GetOrCreateStreamImpl(stream_id, /*may_buffer=*/false);
+ DCHECK(!handler.is_pending);
+ return handler.stream;
+}
+
+QuicSession::StreamHandler QuicSession::GetOrCreateStreamImpl(
+ QuicStreamId stream_id,
+ bool may_buffer) {
+ StaticStreamMap::iterator it = static_stream_map_.find(stream_id);
+ if (it != static_stream_map_.end()) {
+ return StreamHandler(it->second);
+ }
+ return GetOrCreateDynamicStreamImpl(stream_id, may_buffer);
+}
+
+void QuicSession::StreamDraining(QuicStreamId stream_id) {
+ DCHECK(QuicContainsKey(dynamic_stream_map_, stream_id));
+ if (!QuicContainsKey(draining_streams_, stream_id)) {
+ draining_streams_.insert(stream_id);
+ if (IsIncomingStream(stream_id)) {
+ ++num_draining_incoming_streams_;
+ }
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.OnStreamClosed(stream_id);
+ }
+ }
+ if (!IsIncomingStream(stream_id)) {
+ // Inform application that a stream is available.
+ OnCanCreateNewOutgoingStream();
+ }
+}
+
+bool QuicSession::MaybeIncreaseLargestPeerStreamId(
+ const QuicStreamId stream_id) {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
+ }
+ return stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id);
+}
+
+bool QuicSession::ShouldYield(QuicStreamId stream_id) {
+ if (stream_id == currently_writing_stream_id_) {
+ return false;
+ }
+ return write_blocked_streams()->ShouldYield(stream_id);
+}
+
+QuicStream* QuicSession::GetOrCreateDynamicStream(
+ const QuicStreamId stream_id) {
+ StreamHandler handler =
+ GetOrCreateDynamicStreamImpl(stream_id, /*may_buffer=*/false);
+ DCHECK(!handler.is_pending);
+ return handler.stream;
+}
+
+QuicSession::StreamHandler QuicSession::GetOrCreateDynamicStreamImpl(
+ QuicStreamId stream_id,
+ bool may_buffer) {
+ DCHECK(!QuicContainsKey(static_stream_map_, stream_id))
+ << "Attempt to call GetOrCreateDynamicStream for a static stream";
+
+ DynamicStreamMap::iterator it = dynamic_stream_map_.find(stream_id);
+ if (it != dynamic_stream_map_.end()) {
+ return StreamHandler(it->second.get());
+ }
+
+ if (IsClosedStream(stream_id)) {
+ return StreamHandler();
+ }
+
+ if (!IsIncomingStream(stream_id)) {
+ HandleFrameOnNonexistentOutgoingStream(stream_id);
+ return StreamHandler();
+ }
+
+ auto pending_it = pending_stream_map_.find(stream_id);
+ if (pending_it != pending_stream_map_.end()) {
+ DCHECK_EQ(QUIC_VERSION_99, connection_->transport_version());
+ if (may_buffer) {
+ return StreamHandler(pending_it->second.get());
+ }
+ // The stream limit accounting has already been taken care of
+ // when the PendingStream was created, so there is no need to
+ // do so here. Now we can create the actual stream from the
+ // PendingStream.
+ StreamHandler handler(CreateIncomingStream(std::move(*pending_it->second)));
+ pending_stream_map_.erase(pending_it);
+ return handler;
+ }
+
+ // TODO(fkastenholz): If we are creating a new stream and we have
+ // sent a goaway, we should ignore the stream creation. Need to
+ // add code to A) test if goaway was sent ("if (goaway_sent_)") and
+ // B) reject stream creation ("return nullptr")
+
+ if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
+ return StreamHandler();
+ }
+
+ if (connection_->transport_version() != QUIC_VERSION_99) {
+ // TODO(fayang): Let LegacyQuicStreamIdManager count open streams and make
+ // CanOpenIncomingStream interface cosistent with that of v99.
+ if (!stream_id_manager_.CanOpenIncomingStream(
+ GetNumOpenIncomingStreams())) {
+ // Refuse to open the stream.
+ SendRstStream(stream_id, QUIC_REFUSED_STREAM, 0);
+ return StreamHandler();
+ }
+ }
+
+ if (connection_->transport_version() == QUIC_VERSION_99 && may_buffer &&
+ ShouldBufferIncomingStream(stream_id)) {
+ ++num_dynamic_incoming_streams_;
+ // Since STREAM frames may arrive out of order, delay creating the
+ // stream object until the first byte arrives. Buffer the frames and
+ // handle flow control accounting in the PendingStream.
+ auto pending = QuicMakeUnique<PendingStream>(stream_id, this);
+ StreamHandler handler(pending.get());
+ pending_stream_map_[stream_id] = std::move(pending);
+ return handler;
+ }
+
+ return StreamHandler(CreateIncomingStream(stream_id));
+}
+
+void QuicSession::set_largest_peer_created_stream_id(
+ QuicStreamId largest_peer_created_stream_id) {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ v99_streamid_manager_.SetLargestPeerCreatedStreamId(
+ largest_peer_created_stream_id);
+ return;
+ }
+ stream_id_manager_.set_largest_peer_created_stream_id(
+ largest_peer_created_stream_id);
+}
+
+bool QuicSession::IsClosedStream(QuicStreamId id) {
+ DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
+ id);
+ if (IsOpenStream(id)) {
+ // Stream is active
+ return false;
+ }
+
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return !v99_streamid_manager_.IsAvailableStream(id);
+ }
+
+ return !stream_id_manager_.IsAvailableStream(id);
+}
+
+bool QuicSession::IsOpenStream(QuicStreamId id) {
+ DCHECK_NE(QuicUtils::GetInvalidStreamId(connection_->transport_version()),
+ id);
+ if (QuicContainsKey(static_stream_map_, id) ||
+ QuicContainsKey(dynamic_stream_map_, id) ||
+ QuicContainsKey(pending_stream_map_, id)) {
+ // Stream is active
+ return true;
+ }
+ return false;
+}
+
+size_t QuicSession::GetNumOpenIncomingStreams() const {
+ return num_dynamic_incoming_streams_ - num_draining_incoming_streams_ +
+ num_locally_closed_incoming_streams_highest_offset_;
+}
+
+size_t QuicSession::GetNumOpenOutgoingStreams() const {
+ DCHECK_GE(GetNumDynamicOutgoingStreams() +
+ GetNumLocallyClosedOutgoingStreamsHighestOffset(),
+ GetNumDrainingOutgoingStreams());
+ return GetNumDynamicOutgoingStreams() +
+ GetNumLocallyClosedOutgoingStreamsHighestOffset() -
+ GetNumDrainingOutgoingStreams();
+}
+
+size_t QuicSession::GetNumActiveStreams() const {
+ return dynamic_stream_map_.size() - draining_streams_.size();
+}
+
+size_t QuicSession::GetNumDrainingStreams() const {
+ return draining_streams_.size();
+}
+
+void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
+ if (GetOrCreateStream(id) == nullptr) {
+ QUIC_BUG << "Marking unknown stream " << id << " blocked.";
+ QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
+ }
+
+ write_blocked_streams_.AddStream(id);
+}
+
+bool QuicSession::HasDataToWrite() const {
+ return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
+ write_blocked_streams_.HasWriteBlockedDataStreams() ||
+ connection_->HasQueuedData() ||
+ !streams_with_pending_retransmission_.empty() ||
+ control_frame_manager_.WillingToWrite();
+}
+
+void QuicSession::OnAckNeedsRetransmittableFrame() {
+ flow_controller_.SendWindowUpdate();
+}
+
+void QuicSession::SendPing() {
+ control_frame_manager_.WritePing();
+}
+
+size_t QuicSession::GetNumDynamicOutgoingStreams() const {
+ DCHECK_GE(dynamic_stream_map_.size() + pending_stream_map_.size(),
+ num_dynamic_incoming_streams_);
+ return dynamic_stream_map_.size() + pending_stream_map_.size() -
+ num_dynamic_incoming_streams_;
+}
+
+size_t QuicSession::GetNumDrainingOutgoingStreams() const {
+ DCHECK_GE(draining_streams_.size(), num_draining_incoming_streams_);
+ return draining_streams_.size() - num_draining_incoming_streams_;
+}
+
+size_t QuicSession::GetNumLocallyClosedOutgoingStreamsHighestOffset() const {
+ DCHECK_GE(locally_closed_streams_highest_offset_.size(),
+ num_locally_closed_incoming_streams_highest_offset_);
+ return locally_closed_streams_highest_offset_.size() -
+ num_locally_closed_incoming_streams_highest_offset_;
+}
+
+bool QuicSession::IsConnectionFlowControlBlocked() const {
+ return flow_controller_.IsBlocked();
+}
+
+bool QuicSession::IsStreamFlowControlBlocked() {
+ for (auto const& kv : static_stream_map_) {
+ if (kv.second->flow_controller()->IsBlocked()) {
+ return true;
+ }
+ }
+ for (auto const& kv : dynamic_stream_map_) {
+ if (kv.second->flow_controller()->IsBlocked()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+size_t QuicSession::MaxAvailableBidirectionalStreams() const {
+ if (connection()->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
+ }
+ return stream_id_manager_.MaxAvailableStreams();
+}
+
+size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
+ if (connection()->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
+ }
+ return stream_id_manager_.MaxAvailableStreams();
+}
+
+bool QuicSession::IsIncomingStream(QuicStreamId id) const {
+ if (connection()->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.IsIncomingStream(id);
+ }
+ return stream_id_manager_.IsIncomingStream(id);
+}
+
+void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
+ auto it = zombie_streams_.find(id);
+ if (it == zombie_streams_.end()) {
+ return;
+ }
+
+ closed_streams_.push_back(std::move(it->second));
+ if (!closed_streams_clean_up_alarm_->IsSet()) {
+ closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
+ }
+ zombie_streams_.erase(it);
+ // Do not retransmit data of a closed stream.
+ streams_with_pending_retransmission_.erase(id);
+}
+
+QuicStream* QuicSession::GetStream(QuicStreamId id) const {
+ if (id <= largest_static_stream_id_) {
+ auto static_stream = static_stream_map_.find(id);
+ if (static_stream != static_stream_map_.end()) {
+ return static_stream->second;
+ }
+ }
+
+ auto active_stream = dynamic_stream_map_.find(id);
+ if (active_stream != dynamic_stream_map_.end()) {
+ return active_stream->second.get();
+ }
+ auto zombie_stream = zombie_streams_.find(id);
+ if (zombie_stream != zombie_streams_.end()) {
+ return zombie_stream->second.get();
+ }
+ return nullptr;
+}
+
+bool QuicSession::OnFrameAcked(const QuicFrame& frame,
+ QuicTime::Delta ack_delay_time) {
+ if (frame.type == MESSAGE_FRAME) {
+ OnMessageAcked(frame.message_frame->message_id);
+ return true;
+ }
+ if (frame.type == CRYPTO_FRAME) {
+ return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
+ ack_delay_time);
+ }
+ if (frame.type != STREAM_FRAME) {
+ return control_frame_manager_.OnControlFrameAcked(frame);
+ }
+ bool new_stream_data_acked = false;
+ QuicStream* stream = GetStream(frame.stream_frame.stream_id);
+ // Stream can already be reset when sent frame gets acked.
+ if (stream != nullptr) {
+ QuicByteCount newly_acked_length = 0;
+ new_stream_data_acked = stream->OnStreamFrameAcked(
+ frame.stream_frame.offset, frame.stream_frame.data_length,
+ frame.stream_frame.fin, ack_delay_time, &newly_acked_length);
+ if (!stream->HasPendingRetransmission()) {
+ streams_with_pending_retransmission_.erase(stream->id());
+ }
+ }
+ return new_stream_data_acked;
+}
+
+void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
+ QuicStream* stream = GetStream(frame.stream_id);
+ if (stream == nullptr) {
+ QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
+ << " is retransmitted.";
+ connection()->CloseConnection(
+ QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
+ ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
+ return;
+ }
+ stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
+ frame.fin);
+}
+
+void QuicSession::OnFrameLost(const QuicFrame& frame) {
+ if (frame.type == MESSAGE_FRAME) {
+ OnMessageLost(frame.message_frame->message_id);
+ return;
+ }
+ if (frame.type == CRYPTO_FRAME) {
+ GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
+ return;
+ }
+ if (frame.type != STREAM_FRAME) {
+ control_frame_manager_.OnControlFrameLost(frame);
+ return;
+ }
+ QuicStream* stream = GetStream(frame.stream_frame.stream_id);
+ if (stream == nullptr) {
+ return;
+ }
+ stream->OnStreamFrameLost(frame.stream_frame.offset,
+ frame.stream_frame.data_length,
+ frame.stream_frame.fin);
+ if (stream->HasPendingRetransmission() &&
+ !QuicContainsKey(streams_with_pending_retransmission_,
+ frame.stream_frame.stream_id)) {
+ streams_with_pending_retransmission_.insert(
+ std::make_pair(frame.stream_frame.stream_id, true));
+ }
+}
+
+void QuicSession::RetransmitFrames(const QuicFrames& frames,
+ TransmissionType type) {
+ QuicConnection::ScopedPacketFlusher retransmission_flusher(
+ connection_, QuicConnection::NO_ACK);
+ SetTransmissionType(type);
+ for (const QuicFrame& frame : frames) {
+ if (frame.type == MESSAGE_FRAME) {
+ // Do not retransmit MESSAGE frames.
+ continue;
+ }
+ if (frame.type == CRYPTO_FRAME) {
+ GetMutableCryptoStream()->RetransmitData(frame.crypto_frame);
+ continue;
+ }
+ if (frame.type != STREAM_FRAME) {
+ if (!control_frame_manager_.RetransmitControlFrame(frame)) {
+ break;
+ }
+ continue;
+ }
+ QuicStream* stream = GetStream(frame.stream_frame.stream_id);
+ if (stream != nullptr &&
+ !stream->RetransmitStreamData(frame.stream_frame.offset,
+ frame.stream_frame.data_length,
+ frame.stream_frame.fin)) {
+ break;
+ }
+ }
+}
+
+bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
+ if (frame.type == MESSAGE_FRAME) {
+ return false;
+ }
+ if (frame.type == CRYPTO_FRAME) {
+ return GetCryptoStream()->IsFrameOutstanding(
+ frame.crypto_frame->level, frame.crypto_frame->offset,
+ frame.crypto_frame->data_length);
+ }
+ if (frame.type != STREAM_FRAME) {
+ return control_frame_manager_.IsControlFrameOutstanding(frame);
+ }
+ QuicStream* stream = GetStream(frame.stream_frame.stream_id);
+ return stream != nullptr &&
+ stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
+ frame.stream_frame.data_length,
+ frame.stream_frame.fin);
+}
+
+bool QuicSession::HasUnackedCryptoData() const {
+ const QuicCryptoStream* crypto_stream = GetCryptoStream();
+ if (crypto_stream->IsWaitingForAcks()) {
+ return true;
+ }
+ if (GetQuicReloadableFlag(quic_fix_has_pending_crypto_data) &&
+ crypto_stream->HasBufferedData()) {
+ QUIC_RELOADABLE_FLAG_COUNT(quic_fix_has_pending_crypto_data);
+ return true;
+ }
+ return false;
+}
+
+WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
+ QuicStreamOffset offset,
+ QuicByteCount data_length,
+ QuicDataWriter* writer) {
+ QuicStream* stream = GetStream(id);
+ if (stream == nullptr) {
+ // This causes the connection to be closed because of failed to serialize
+ // packet.
+ QUIC_BUG << "Stream " << id << " does not exist when trying to write data.";
+ return STREAM_MISSING;
+ }
+ if (stream->WriteStreamData(offset, data_length, writer)) {
+ return WRITE_SUCCESS;
+ }
+ return WRITE_FAILED;
+}
+
+bool QuicSession::WriteCryptoData(EncryptionLevel level,
+ QuicStreamOffset offset,
+ QuicByteCount data_length,
+ QuicDataWriter* writer) {
+ return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
+ writer);
+}
+
+QuicUint128 QuicSession::GetStatelessResetToken() const {
+ return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
+}
+
+bool QuicSession::RetransmitLostData() {
+ QuicConnection::ScopedPacketFlusher retransmission_flusher(
+ connection_, QuicConnection::SEND_ACK_IF_QUEUED);
+ // Retransmit crypto data first.
+ bool uses_crypto_frames = connection_->transport_version() >= QUIC_VERSION_47;
+ QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
+ if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
+ SetTransmissionType(HANDSHAKE_RETRANSMISSION);
+ crypto_stream->WritePendingCryptoRetransmission();
+ }
+ // Retransmit crypto data in stream 1 frames (version < 47).
+ if (!uses_crypto_frames &&
+ QuicContainsKey(
+ streams_with_pending_retransmission_,
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()))) {
+ SetTransmissionType(HANDSHAKE_RETRANSMISSION);
+ // Retransmit crypto data first.
+ QuicStream* crypto_stream = GetStream(
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()));
+ crypto_stream->OnCanWrite();
+ DCHECK(CheckStreamWriteBlocked(crypto_stream));
+ if (crypto_stream->HasPendingRetransmission()) {
+ // Connection is write blocked.
+ return false;
+ } else {
+ streams_with_pending_retransmission_.erase(
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()));
+ }
+ }
+ if (control_frame_manager_.HasPendingRetransmission()) {
+ SetTransmissionType(LOSS_RETRANSMISSION);
+ control_frame_manager_.OnCanWrite();
+ if (control_frame_manager_.HasPendingRetransmission()) {
+ return false;
+ }
+ }
+ while (!streams_with_pending_retransmission_.empty()) {
+ if (!connection_->CanWriteStreamData()) {
+ break;
+ }
+ // Retransmit lost data on headers and data streams.
+ const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
+ QuicStream* stream = GetStream(id);
+ if (stream != nullptr) {
+ SetTransmissionType(LOSS_RETRANSMISSION);
+ stream->OnCanWrite();
+ DCHECK(CheckStreamWriteBlocked(stream));
+ if (stream->HasPendingRetransmission()) {
+ // Connection is write blocked.
+ break;
+ } else if (!streams_with_pending_retransmission_.empty() &&
+ streams_with_pending_retransmission_.begin()->first == id) {
+ // Retransmit lost data may cause connection close. If this stream
+ // has not yet sent fin, a RST_STREAM will be sent and it will be
+ // removed from streams_with_pending_retransmission_.
+ streams_with_pending_retransmission_.pop_front();
+ }
+ } else {
+ QUIC_BUG << "Try to retransmit data of a closed stream";
+ streams_with_pending_retransmission_.pop_front();
+ }
+ }
+
+ return streams_with_pending_retransmission_.empty();
+}
+
+void QuicSession::NeuterUnencryptedData() {
+ if (connection_->session_decides_what_to_write()) {
+ QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
+ crypto_stream->NeuterUnencryptedStreamData();
+ if (!crypto_stream->HasPendingRetransmission()) {
+ streams_with_pending_retransmission_.erase(
+ QuicUtils::GetCryptoStreamId(connection_->transport_version()));
+ }
+ }
+ connection_->NeuterUnencryptedPackets();
+}
+
+void QuicSession::SetTransmissionType(TransmissionType type) {
+ connection_->SetTransmissionType(type);
+}
+
+MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
+ if (!IsEncryptionEstablished()) {
+ return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
+ }
+ MessageStatus result =
+ connection_->SendMessage(last_message_id_ + 1, message);
+ if (result == MESSAGE_STATUS_SUCCESS) {
+ return {result, ++last_message_id_};
+ }
+ return {result, 0};
+}
+
+void QuicSession::OnMessageAcked(QuicMessageId message_id) {
+ QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
+}
+
+void QuicSession::OnMessageLost(QuicMessageId message_id) {
+ QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
+ << " is considered lost";
+}
+
+void QuicSession::CleanUpClosedStreams() {
+ closed_streams_.clear();
+}
+
+bool QuicSession::session_decides_what_to_write() const {
+ return connection_->session_decides_what_to_write();
+}
+
+QuicPacketLength QuicSession::GetLargestMessagePayload() const {
+ return connection_->GetLargestMessagePayload();
+}
+
+void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
+ control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
+}
+
+void QuicSession::OnCanCreateNewOutgoingStream() {}
+
+QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
+ }
+ return stream_id_manager_.next_outgoing_stream_id();
+}
+
+QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
+ }
+ return stream_id_manager_.next_outgoing_stream_id();
+}
+
+bool QuicSession::OnMaxStreamIdFrame(const QuicMaxStreamIdFrame& frame) {
+ return v99_streamid_manager_.OnMaxStreamIdFrame(frame);
+}
+
+bool QuicSession::OnStreamIdBlockedFrame(
+ const QuicStreamIdBlockedFrame& frame) {
+ return v99_streamid_manager_.OnStreamIdBlockedFrame(frame);
+}
+
+size_t QuicSession::max_open_incoming_bidirectional_streams() const {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
+ }
+ return stream_id_manager_.max_open_incoming_streams();
+}
+
+size_t QuicSession::max_open_incoming_unidirectional_streams() const {
+ if (connection_->transport_version() == QUIC_VERSION_99) {
+ return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
+ }
+ return stream_id_manager_.max_open_incoming_streams();
+}
+
+#undef ENDPOINT // undef for jumbo builds
+} // namespace quic