// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/third_party/quiche/src/quic/core/quic_session.h"

#include <cstdint>
#include <string>
#include <utility>

#include "net/third_party/quiche/src/quic/core/quic_connection.h"
#include "net/third_party/quiche/src/quic/core/quic_error_codes.h"
#include "net/third_party/quiche/src/quic/core/quic_flow_controller.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/core/quic_utils.h"
#include "net/third_party/quiche/src/quic/core/quic_versions.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_bug_tracker.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flag_utils.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_flags.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_map_util.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_server_stats.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_stack_trace.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_str_cat.h"
#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"

using spdy::SpdyPriority;

namespace quic {

namespace {

class ClosedStreamsCleanUpDelegate : public QuicAlarm::Delegate {
 public:
  explicit ClosedStreamsCleanUpDelegate(QuicSession* session)
      : session_(session) {}
  ClosedStreamsCleanUpDelegate(const ClosedStreamsCleanUpDelegate&) = delete;
  ClosedStreamsCleanUpDelegate& operator=(const ClosedStreamsCleanUpDelegate&) =
      delete;

  void OnAlarm() override { session_->CleanUpClosedStreams(); }

 private:
  QuicSession* session_;
};

}  // namespace

#define ENDPOINT \
  (perspective() == Perspective::IS_SERVER ? "Server: " : "Client: ")

QuicSession::QuicSession(
    QuicConnection* connection,
    Visitor* owner,
    const QuicConfig& config,
    const ParsedQuicVersionVector& supported_versions,
    QuicStreamCount num_expected_unidirectional_static_streams)
    : connection_(connection),
      perspective_(connection->perspective()),
      visitor_(owner),
      write_blocked_streams_(connection->transport_version()),
      config_(config),
      stream_id_manager_(perspective(),
                         connection->transport_version(),
                         kDefaultMaxStreamsPerConnection,
                         config_.GetMaxBidirectionalStreamsToSend()),
      v99_streamid_manager_(perspective(),
                            connection->version(),
                            this,
                            0,
                            num_expected_unidirectional_static_streams,
                            config_.GetMaxBidirectionalStreamsToSend(),
                            config_.GetMaxUnidirectionalStreamsToSend() +
                                num_expected_unidirectional_static_streams),
      num_draining_streams_(0),
      num_outgoing_draining_streams_(0),
      num_static_streams_(0),
      flow_controller_(
          this,
          QuicUtils::GetInvalidStreamId(connection->transport_version()),
          /*is_connection_flow_controller*/ true,
          connection->version().AllowsLowFlowControlLimits()
              ? 0
              : kMinimumFlowControlSendWindow,
          config_.GetInitialSessionFlowControlWindowToSend(),
          kSessionReceiveWindowLimit,
          perspective() == Perspective::IS_SERVER,
          nullptr),
      currently_writing_stream_id_(0),
      goaway_sent_(false),
      goaway_received_(false),
      control_frame_manager_(this),
      last_message_id_(0),
      datagram_queue_(this),
      closed_streams_clean_up_alarm_(nullptr),
      supported_versions_(supported_versions),
      use_http2_priority_write_scheduler_(false),
      is_configured_(false),
      enable_round_robin_scheduling_(false),
      was_zero_rtt_rejected_(false),
      fix_gquic_stream_type_(
          GetQuicReloadableFlag(quic_fix_gquic_stream_type)) {
  closed_streams_clean_up_alarm_ =
      QuicWrapUnique<QuicAlarm>(connection_->alarm_factory()->CreateAlarm(
          new ClosedStreamsCleanUpDelegate(this)));
  if (perspective() == Perspective::IS_SERVER &&
      connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
    config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
  }
  if (VersionHasIetfQuicFrames(transport_version())) {
    config_.SetMaxUnidirectionalStreamsToSend(
        config_.GetMaxUnidirectionalStreamsToSend() +
        num_expected_unidirectional_static_streams);
  }
}

void QuicSession::Initialize() {
  connection_->set_visitor(this);
  connection_->SetSessionNotifier(this);
  connection_->SetDataProducer(this);
  connection_->SetFromConfig(config_);
  if (perspective() == Perspective::IS_CLIENT && version().UsesTls()) {
    config_.SetMinAckDelayMs(kDefaultMinAckDelayTimeMs);
    if (GetQuicReloadableFlag(quic_support_handshake_done_in_t050) &&
        !version().HasHandshakeDone()) {
      config_.SetSupportHandshakeDone();
    }
  }

  // On the server side, version negotiation has been done by the dispatcher,
  // and the server session is created with the right version.
  if (perspective() == Perspective::IS_SERVER) {
    connection_->OnSuccessfulVersionNegotiation();
  }

  if (QuicVersionUsesCryptoFrames(transport_version())) {
    return;
  }

  DCHECK_EQ(QuicUtils::GetCryptoStreamId(transport_version()),
            GetMutableCryptoStream()->id());
}

QuicSession::~QuicSession() {
  QUIC_LOG_IF(WARNING, !zombie_streams_.empty()) << "Still have zombie streams";
}

void QuicSession::PendingStreamOnStreamFrame(const QuicStreamFrame& frame) {
  DCHECK(VersionUsesHttp3(transport_version()));
  QuicStreamId stream_id = frame.stream_id;

  PendingStream* pending = GetOrCreatePendingStream(stream_id);

  if (!pending) {
    if (frame.fin) {
      QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
      OnFinalByteOffsetReceived(stream_id, final_byte_offset);
    }
    return;
  }

  pending->OnStreamFrame(frame);
  if (!connection()->connected()) {
    return;
  }
  if (ProcessPendingStream(pending)) {
    // The pending stream should now be in the scope of normal streams.
    DCHECK(IsClosedStream(stream_id) || IsOpenStream(stream_id))
        << "Stream " << stream_id << " not created";
    pending_stream_map_.erase(stream_id);
    return;
  }
  if (pending->sequencer()->IsClosed()) {
    ClosePendingStream(stream_id);
  }
}

void QuicSession::OnStreamFrame(const QuicStreamFrame& frame) {
  QuicStreamId stream_id = frame.stream_id;
  if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  if (UsesPendingStreams() &&
      QuicUtils::GetStreamType(stream_id, perspective(),
                               IsIncomingStream(stream_id),
                               version()) == READ_UNIDIRECTIONAL &&
      stream_map_.find(stream_id) == stream_map_.end()) {
    PendingStreamOnStreamFrame(frame);
    return;
  }

  QuicStream* stream = GetOrCreateStream(stream_id);

  if (!stream) {
    // The stream no longer exists, but we may still be interested in the
    // final stream byte offset sent by the peer. A frame with a FIN can give
    // us this offset.
    if (frame.fin) {
      QuicStreamOffset final_byte_offset = frame.offset + frame.data_length;
      OnFinalByteOffsetReceived(stream_id, final_byte_offset);
    }
    return;
  }
  stream->OnStreamFrame(frame);
}

void QuicSession::OnCryptoFrame(const QuicCryptoFrame& frame) {
  GetMutableCryptoStream()->OnCryptoFrame(frame);
}

void QuicSession::OnStopSendingFrame(const QuicStopSendingFrame& frame) {
  // STOP_SENDING is in IETF QUIC only.
  DCHECK(VersionHasIetfQuicFrames(transport_version()));
  DCHECK(QuicVersionUsesCryptoFrames(transport_version()));

  QuicStreamId stream_id = frame.stream_id;
  // If Stream ID is invalid then close the connection.
  // TODO(ianswett): This check is redundant to checks for IsClosedStream,
  // but removing it requires removing multiple DCHECKs.
  // TODO(ianswett): Multiple QUIC_DVLOGs could be QUIC_PEER_BUGs.
  if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
    QUIC_DVLOG(1) << ENDPOINT
                  << "Received STOP_SENDING with invalid stream_id: "
                  << stream_id << " Closing connection";
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for an invalid stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  // If stream_id is READ_UNIDIRECTIONAL, close the connection.
  if (QuicUtils::GetStreamType(stream_id, perspective(),
                               IsIncomingStream(stream_id),
                               version()) == READ_UNIDIRECTIONAL) {
    QUIC_DVLOG(1) << ENDPOINT
                  << "Received STOP_SENDING for a read-only stream_id: "
                  << stream_id << ".";
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Received STOP_SENDING for a read-only stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  if (visitor_) {
    visitor_->OnStopSendingReceived(frame);
  }

  QuicStream* stream = GetOrCreateStream(stream_id);
  if (!stream) {
    // Errors are handled by GetOrCreateStream.
    return;
  }

  if (!stream->OnStopSending(frame.application_error_code)) {
    return;
  }

  // TODO(renjietang): Consider moving those code into the stream.
  if (connection()->connected()) {
    MaybeSendRstStreamFrame(
        stream->id(),
        static_cast<quic::QuicRstStreamErrorCode>(frame.application_error_code),
        stream->stream_bytes_written());
    connection_->OnStreamReset(stream->id(),
                               static_cast<quic::QuicRstStreamErrorCode>(
                                   frame.application_error_code));
  }
  stream->set_rst_sent(true);
  stream->CloseWriteSide();
}

void QuicSession::OnPacketDecrypted(EncryptionLevel level) {
  GetMutableCryptoStream()->OnPacketDecrypted(level);
}

void QuicSession::OnOneRttPacketAcknowledged() {
  GetMutableCryptoStream()->OnOneRttPacketAcknowledged();
}

void QuicSession::OnHandshakePacketSent() {
  GetMutableCryptoStream()->OnHandshakePacketSent();
}

void QuicSession::PendingStreamOnRstStream(const QuicRstStreamFrame& frame) {
  DCHECK(VersionUsesHttp3(transport_version()));
  QuicStreamId stream_id = frame.stream_id;

  PendingStream* pending = GetOrCreatePendingStream(stream_id);

  if (!pending) {
    HandleRstOnValidNonexistentStream(frame);
    return;
  }

  pending->OnRstStreamFrame(frame);
  // Pending stream is currently read only. We can safely close the stream.
  DCHECK_EQ(READ_UNIDIRECTIONAL,
            QuicUtils::GetStreamType(pending->id(), perspective(),
                                     /*peer_initiated = */ true, version()));
  ClosePendingStream(stream_id);
}

void QuicSession::OnRstStream(const QuicRstStreamFrame& frame) {
  QuicStreamId stream_id = frame.stream_id;
  if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Received data for an invalid stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  if (VersionHasIetfQuicFrames(transport_version()) &&
      QuicUtils::GetStreamType(stream_id, perspective(),
                               IsIncomingStream(stream_id),
                               version()) == WRITE_UNIDIRECTIONAL) {
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Received RESET_STREAM for a write-only stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  if (visitor_) {
    visitor_->OnRstStreamReceived(frame);
  }

  if (UsesPendingStreams() &&
      QuicUtils::GetStreamType(stream_id, perspective(),
                               IsIncomingStream(stream_id),
                               version()) == READ_UNIDIRECTIONAL &&
      stream_map_.find(stream_id) == stream_map_.end()) {
    PendingStreamOnRstStream(frame);
    return;
  }

  QuicStream* stream = GetOrCreateStream(stream_id);

  if (!stream) {
    HandleRstOnValidNonexistentStream(frame);
    return;  // Errors are handled by GetOrCreateStream.
  }
  stream->OnStreamReset(frame);
}

void QuicSession::OnGoAway(const QuicGoAwayFrame& /*frame*/) {
  goaway_received_ = true;
}

void QuicSession::OnMessageReceived(quiche::QuicheStringPiece message) {
  QUIC_DVLOG(1) << ENDPOINT << "Received message, length: " << message.length()
                << ", " << message;
}

void QuicSession::OnHandshakeDoneReceived() {
  QUIC_DVLOG(1) << ENDPOINT << "OnHandshakeDoneReceived";
  GetMutableCryptoStream()->OnHandshakeDoneReceived();
}

// static
void QuicSession::RecordConnectionCloseAtServer(QuicErrorCode error,
                                                ConnectionCloseSource source) {
  if (error != QUIC_NO_ERROR) {
    if (source == ConnectionCloseSource::FROM_SELF) {
      QUIC_SERVER_HISTOGRAM_ENUM(
          "quic_server_connection_close_errors", error, QUIC_LAST_ERROR,
          "QuicErrorCode for server-closed connections.");
    } else {
      QUIC_SERVER_HISTOGRAM_ENUM(
          "quic_client_connection_close_errors", error, QUIC_LAST_ERROR,
          "QuicErrorCode for client-closed connections.");
    }
  }
}

void QuicSession::OnConnectionClosed(const QuicConnectionCloseFrame& frame,
                                     ConnectionCloseSource source) {
  DCHECK(!connection_->connected());
  if (perspective() == Perspective::IS_SERVER) {
    RecordConnectionCloseAtServer(frame.quic_error_code, source);
  }

  if (on_closed_frame_.quic_error_code == QUIC_NO_ERROR) {
    // Save all of the connection close information
    on_closed_frame_ = frame;
  }

  GetMutableCryptoStream()->OnConnectionClosed(frame.quic_error_code, source);

  // Copy all non static streams in a new map for the ease of deleting.
  QuicSmallMap<QuicStreamId, QuicStream*, 10> non_static_streams;
  for (const auto& it : stream_map_) {
    if (!it.second->is_static()) {
      non_static_streams[it.first] = it.second.get();
    }
  }
  for (const auto& it : non_static_streams) {
    QuicStreamId id = it.first;
    it.second->OnConnectionClosed(frame.quic_error_code, source);
    QUIC_RELOADABLE_FLAG_COUNT(
        quic_do_not_close_stream_again_on_connection_close);
    if (stream_map_.find(id) != stream_map_.end()) {
      QUIC_BUG << ENDPOINT << "Stream " << id
               << " failed to close under OnConnectionClosed";
      if (!GetQuicReloadableFlag(
              quic_do_not_close_stream_again_on_connection_close)) {
        CloseStream(id);
      }
    }
  }

  // Cleanup zombie stream map on connection close.
  while (!zombie_streams_.empty()) {
    ZombieStreamMap::iterator it = zombie_streams_.begin();
    closed_streams_.push_back(std::move(it->second));
    zombie_streams_.erase(it);
  }

  closed_streams_clean_up_alarm_->Cancel();

  if (visitor_) {
    visitor_->OnConnectionClosed(connection_->connection_id(),
                                 frame.quic_error_code, frame.error_details,
                                 source);
  }
}

void QuicSession::OnWriteBlocked() {
  if (!connection_->connected()) {
    return;
  }
  if (visitor_) {
    visitor_->OnWriteBlocked(connection_);
  }
}

void QuicSession::OnSuccessfulVersionNegotiation(
    const ParsedQuicVersion& /*version*/) {}

void QuicSession::OnPacketReceived(const QuicSocketAddress& /*self_address*/,
                                   const QuicSocketAddress& peer_address,
                                   bool is_connectivity_probe) {
  if (is_connectivity_probe && perspective() == Perspective::IS_SERVER) {
    // Server only sends back a connectivity probe after received a
    // connectivity probe from a new peer address.
    connection_->SendConnectivityProbingResponsePacket(peer_address);
  }
}

void QuicSession::OnPathDegrading() {}

void QuicSession::OnForwardProgressMadeAfterPathDegrading() {}

bool QuicSession::AllowSelfAddressChange() const {
  return false;
}

void QuicSession::OnWindowUpdateFrame(const QuicWindowUpdateFrame& frame) {
  // Stream may be closed by the time we receive a WINDOW_UPDATE, so we can't
  // assume that it still exists.
  QuicStreamId stream_id = frame.stream_id;
  if (stream_id == QuicUtils::GetInvalidStreamId(transport_version())) {
    // This is a window update that applies to the connection, rather than an
    // individual stream.
    QUIC_DVLOG(1) << ENDPOINT
                  << "Received connection level flow control window "
                     "update with max data: "
                  << frame.max_data;
    flow_controller_.UpdateSendWindowOffset(frame.max_data);
    return;
  }

  if (VersionHasIetfQuicFrames(transport_version()) &&
      QuicUtils::GetStreamType(stream_id, perspective(),
                               IsIncomingStream(stream_id),
                               version()) == READ_UNIDIRECTIONAL) {
    connection()->CloseConnection(
        QUIC_WINDOW_UPDATE_RECEIVED_ON_READ_UNIDIRECTIONAL_STREAM,
        "WindowUpdateFrame received on READ_UNIDIRECTIONAL stream.",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  QuicStream* stream = GetOrCreateStream(stream_id);
  if (stream != nullptr) {
    stream->OnWindowUpdateFrame(frame);
  }
}

void QuicSession::OnBlockedFrame(const QuicBlockedFrame& frame) {
  // TODO(rjshade): Compare our flow control receive windows for specified
  //                streams: if we have a large window then maybe something
  //                had gone wrong with the flow control accounting.
  QUIC_DLOG(INFO) << ENDPOINT << "Received BLOCKED frame with stream id: "
                  << frame.stream_id;
}

bool QuicSession::CheckStreamNotBusyLooping(QuicStream* stream,
                                            uint64_t previous_bytes_written,
                                            bool previous_fin_sent) {
  if (  // Stream should not be closed.
      !stream->write_side_closed() &&
      // Not connection flow control blocked.
      !flow_controller_.IsBlocked() &&
      // Detect lack of forward progress.
      previous_bytes_written == stream->stream_bytes_written() &&
      previous_fin_sent == stream->fin_sent()) {
    stream->set_busy_counter(stream->busy_counter() + 1);
    QUIC_DVLOG(1) << ENDPOINT << "Suspected busy loop on stream id "
                  << stream->id() << " stream_bytes_written "
                  << stream->stream_bytes_written() << " fin "
                  << stream->fin_sent() << " count " << stream->busy_counter();
    // Wait a few iterations before firing, the exact count is
    // arbitrary, more than a few to cover a few test-only false
    // positives.
    if (stream->busy_counter() > 20) {
      QUIC_LOG(ERROR) << ENDPOINT << "Detected busy loop on stream id "
                      << stream->id() << " stream_bytes_written "
                      << stream->stream_bytes_written() << " fin "
                      << stream->fin_sent();
      return false;
    }
  } else {
    stream->set_busy_counter(0);
  }
  return true;
}

bool QuicSession::CheckStreamWriteBlocked(QuicStream* stream) const {
  if (!stream->write_side_closed() && stream->HasBufferedData() &&
      !stream->flow_controller()->IsBlocked() &&
      !write_blocked_streams_.IsStreamBlocked(stream->id())) {
    QUIC_DLOG(ERROR) << ENDPOINT << "stream " << stream->id()
                     << " has buffered " << stream->BufferedDataBytes()
                     << " bytes, and is not flow control blocked, "
                        "but it is not in the write block list.";
    return false;
  }
  return true;
}

void QuicSession::OnCanWrite() {
  if (!RetransmitLostData()) {
    // Cannot finish retransmitting lost data, connection is write blocked.
    QUIC_DVLOG(1) << ENDPOINT
                  << "Cannot finish retransmitting lost data, connection is "
                     "write blocked.";
    return;
  }
  // We limit the number of writes to the number of pending streams. If more
  // streams become pending, WillingAndAbleToWrite will be true, which will
  // cause the connection to request resumption before yielding to other
  // connections.
  // If we are connection level flow control blocked, then only allow the
  // crypto and headers streams to try writing as all other streams will be
  // blocked.
  size_t num_writes = flow_controller_.IsBlocked()
                          ? write_blocked_streams_.NumBlockedSpecialStreams()
                          : write_blocked_streams_.NumBlockedStreams();
  if (num_writes == 0 && !control_frame_manager_.WillingToWrite() &&
      datagram_queue_.empty() &&
      (!QuicVersionUsesCryptoFrames(transport_version()) ||
       !GetCryptoStream()->HasBufferedCryptoFrames())) {
    return;
  }

  QuicConnection::ScopedPacketFlusher flusher(connection_);
  if (QuicVersionUsesCryptoFrames(transport_version())) {
    QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
    if (crypto_stream->HasBufferedCryptoFrames()) {
      crypto_stream->WriteBufferedCryptoFrames();
    }
    if (crypto_stream->HasBufferedCryptoFrames()) {
      // Cannot finish writing buffered crypto frames, connection is write
      // blocked.
      return;
    }
  }
  if (control_frame_manager_.WillingToWrite()) {
    control_frame_manager_.OnCanWrite();
  }
  // TODO(b/147146815): this makes all datagrams go before stream data.  We
  // should have a better priority scheme for this.
  if (!datagram_queue_.empty()) {
    size_t written = datagram_queue_.SendDatagrams();
    QUIC_DVLOG(1) << ENDPOINT << "Sent " << written << " datagrams";
    if (!datagram_queue_.empty()) {
      return;
    }
  }
  std::vector<QuicStreamId> last_writing_stream_ids;
  for (size_t i = 0; i < num_writes; ++i) {
    if (!(write_blocked_streams_.HasWriteBlockedSpecialStream() ||
          write_blocked_streams_.HasWriteBlockedDataStreams())) {
      // Writing one stream removed another!? Something's broken.
      QUIC_BUG << "WriteBlockedStream is missing, num_writes: " << num_writes
               << ", finished_writes: " << i
               << ", connected: " << connection_->connected()
               << ", connection level flow control blocked: "
               << flow_controller_.IsBlocked() << ", scheduler type: "
               << spdy::WriteSchedulerTypeToString(
                      write_blocked_streams_.scheduler_type());
      for (QuicStreamId id : last_writing_stream_ids) {
        QUIC_LOG(WARNING) << "last_writing_stream_id: " << id;
      }
      connection_->CloseConnection(QUIC_INTERNAL_ERROR,
                                   "WriteBlockedStream is missing",
                                   ConnectionCloseBehavior::SILENT_CLOSE);
      return;
    }
    if (!CanWriteStreamData()) {
      return;
    }
    currently_writing_stream_id_ = write_blocked_streams_.PopFront();
    last_writing_stream_ids.push_back(currently_writing_stream_id_);
    QUIC_DVLOG(1) << ENDPOINT << "Removing stream "
                  << currently_writing_stream_id_ << " from write-blocked list";
    QuicStream* stream = GetOrCreateStream(currently_writing_stream_id_);
    if (stream != nullptr && !stream->flow_controller()->IsBlocked()) {
      // If the stream can't write all bytes it'll re-add itself to the blocked
      // list.
      uint64_t previous_bytes_written = stream->stream_bytes_written();
      bool previous_fin_sent = stream->fin_sent();
      QUIC_DVLOG(1) << ENDPOINT << "stream " << stream->id()
                    << " bytes_written " << previous_bytes_written << " fin "
                    << previous_fin_sent;
      stream->OnCanWrite();
      DCHECK(CheckStreamWriteBlocked(stream));
      DCHECK(CheckStreamNotBusyLooping(stream, previous_bytes_written,
                                       previous_fin_sent));
    }
    currently_writing_stream_id_ = 0;
  }
}

bool QuicSession::SendProbingData() {
  if (connection()->sent_packet_manager().MaybeRetransmitOldestPacket(
          PROBING_RETRANSMISSION)) {
    return true;
  }
  return false;
}

bool QuicSession::WillingAndAbleToWrite() const {
  // Schedule a write when:
  // 1) control frame manager has pending or new control frames, or
  // 2) any stream has pending retransmissions, or
  // 3) If the crypto or headers streams are blocked, or
  // 4) connection is not flow control blocked and there are write blocked
  // streams.
  if (QuicVersionUsesCryptoFrames(transport_version()) &&
      HasPendingHandshake()) {
    return true;
  }
  if (control_frame_manager_.WillingToWrite() ||
      !streams_with_pending_retransmission_.empty()) {
    return true;
  }
  if (flow_controller_.IsBlocked()) {
    if (VersionUsesHttp3(transport_version())) {
      return false;
    }
    // Crypto and headers streams are not blocked by connection level flow
    // control.
    return write_blocked_streams_.HasWriteBlockedSpecialStream();
  }
  return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
         write_blocked_streams_.HasWriteBlockedDataStreams();
}

bool QuicSession::HasPendingHandshake() const {
  if (QuicVersionUsesCryptoFrames(transport_version())) {
    return GetCryptoStream()->HasPendingCryptoRetransmission() ||
           GetCryptoStream()->HasBufferedCryptoFrames();
  }
  return QuicContainsKey(streams_with_pending_retransmission_,
                         QuicUtils::GetCryptoStreamId(transport_version())) ||
         write_blocked_streams_.IsStreamBlocked(
             QuicUtils::GetCryptoStreamId(transport_version()));
}

void QuicSession::ProcessUdpPacket(const QuicSocketAddress& self_address,
                                   const QuicSocketAddress& peer_address,
                                   const QuicReceivedPacket& packet) {
  connection_->ProcessUdpPacket(self_address, peer_address, packet);
}

QuicConsumedData QuicSession::WritevData(
    QuicStreamId id,
    size_t write_length,
    QuicStreamOffset offset,
    StreamSendingState state,
    TransmissionType type,
    quiche::QuicheOptional<EncryptionLevel> level) {
  DCHECK(connection_->connected())
      << ENDPOINT << "Try to write stream data when connection is closed.";
  if (!IsEncryptionEstablished() &&
      !QuicUtils::IsCryptoStreamId(transport_version(), id)) {
    // Do not let streams write without encryption. The calling stream will end
    // up write blocked until OnCanWrite is next called.
    QUIC_BUG << ENDPOINT << "Try to send data of stream " << id
             << " before encryption is established.";
    return QuicConsumedData(0, false);
  }

  SetTransmissionType(type);
  const auto current_level = connection()->encryption_level();
  if (level.has_value()) {
    connection()->SetDefaultEncryptionLevel(level.value());
  }

  QuicConsumedData data =
      connection_->SendStreamData(id, write_length, offset, state);
  if (type == NOT_RETRANSMISSION) {
    // This is new stream data.
    write_blocked_streams_.UpdateBytesForStream(id, data.bytes_consumed);
  }

  // Restore the encryption level.
  if (level.has_value()) {
    connection()->SetDefaultEncryptionLevel(current_level);
  }

  return data;
}

size_t QuicSession::SendCryptoData(EncryptionLevel level,
                                   size_t write_length,
                                   QuicStreamOffset offset,
                                   TransmissionType type) {
  DCHECK(QuicVersionUsesCryptoFrames(transport_version()));
  SetTransmissionType(type);
  const auto current_level = connection()->encryption_level();
  connection_->SetDefaultEncryptionLevel(level);
  const auto bytes_consumed =
      connection_->SendCryptoData(level, write_length, offset);
  // Restores encryption level.
  connection_->SetDefaultEncryptionLevel(current_level);
  return bytes_consumed;
}

void QuicSession::OnControlFrameManagerError(QuicErrorCode error_code,
                                             std::string error_details) {
  connection_->CloseConnection(
      error_code, error_details,
      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}

bool QuicSession::WriteControlFrame(const QuicFrame& frame,
                                    TransmissionType type) {
  DCHECK(connection()->connected())
      << ENDPOINT << "Try to write control frames when connection is closed.";
  SetTransmissionType(type);
  return connection_->SendControlFrame(frame);
}

void QuicSession::SendRstStream(QuicStreamId id,
                                QuicRstStreamErrorCode error,
                                QuicStreamOffset bytes_written) {
  if (connection()->connected()) {
    QuicConnection::ScopedPacketFlusher flusher(connection());
    MaybeSendRstStreamFrame(id, error, bytes_written);
    MaybeSendStopSendingFrame(id, error);

    connection_->OnStreamReset(id, error);
  }
}

void QuicSession::ResetStream(QuicStreamId id, QuicRstStreamErrorCode error) {
  QuicStream* stream = GetStream(id);
  if (stream != nullptr && stream->is_static()) {
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Try to reset a static stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  if (stream != nullptr) {
    stream->Reset(error);
    return;
  }

  SendRstStream(id, error, 0);
}

void QuicSession::MaybeSendRstStreamFrame(QuicStreamId id,
                                          QuicRstStreamErrorCode error,
                                          QuicStreamOffset bytes_written) {
  DCHECK(connection()->connected());
  if (!VersionHasIetfQuicFrames(transport_version()) ||
      QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
                               version()) != READ_UNIDIRECTIONAL) {
    control_frame_manager_.WriteOrBufferRstStream(id, error, bytes_written);
  }
}

void QuicSession::MaybeSendStopSendingFrame(QuicStreamId id,
                                            QuicRstStreamErrorCode error) {
  DCHECK(connection()->connected());
  if (VersionHasIetfQuicFrames(transport_version()) &&
      QuicUtils::GetStreamType(id, perspective(), IsIncomingStream(id),
                               version()) != WRITE_UNIDIRECTIONAL) {
    control_frame_manager_.WriteOrBufferStopSending(error, id);
  }
}

void QuicSession::SendGoAway(QuicErrorCode error_code,
                             const std::string& reason) {
  // GOAWAY frame is not supported in v99.
  DCHECK(!VersionHasIetfQuicFrames(transport_version()));
  if (goaway_sent_) {
    return;
  }
  goaway_sent_ = true;
  control_frame_manager_.WriteOrBufferGoAway(
      error_code, stream_id_manager_.largest_peer_created_stream_id(), reason);
}

void QuicSession::SendBlocked(QuicStreamId id) {
  control_frame_manager_.WriteOrBufferBlocked(id);
}

void QuicSession::SendWindowUpdate(QuicStreamId id,
                                   QuicStreamOffset byte_offset) {
  control_frame_manager_.WriteOrBufferWindowUpdate(id, byte_offset);
}

void QuicSession::OnStreamError(QuicErrorCode error_code,
                                std::string error_details) {
  connection_->CloseConnection(
      error_code, error_details,
      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}

void QuicSession::SendMaxStreams(QuicStreamCount stream_count,
                                 bool unidirectional) {
  if (!is_configured_) {
    QUIC_BUG << "Try to send max streams before config negotiated.";
    return;
  }
  control_frame_manager_.WriteOrBufferMaxStreams(stream_count, unidirectional);
}

void QuicSession::CloseStream(QuicStreamId stream_id) {
  QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;

  StreamMap::iterator it = stream_map_.find(stream_id);
  if (it == stream_map_.end()) {
    // When CloseStream has been called recursively (via
    // QuicStream::OnClose), the stream will already have been deleted
    // from stream_map_, so return immediately.
    QUIC_DVLOG(1) << ENDPOINT << "Stream is already closed: " << stream_id;
    return;
  }
  QuicStream* stream = it->second.get();
  if (stream->is_static()) {
    QUIC_DVLOG(1) << ENDPOINT
                  << "Try to close a static stream, id: " << stream_id
                  << " Closing connection";
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, "Try to close a static stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }
  stream->CloseReadSide();
  stream->CloseWriteSide();
}

void QuicSession::InsertLocallyClosedStreamsHighestOffset(
    const QuicStreamId id,
    QuicStreamOffset offset) {
  locally_closed_streams_highest_offset_[id] = offset;
}

void QuicSession::OnStreamClosed(QuicStreamId stream_id) {
  QUIC_DVLOG(1) << ENDPOINT << "Closing stream: " << stream_id;
  StreamMap::iterator it = stream_map_.find(stream_id);
  if (it == stream_map_.end()) {
    QUIC_BUG << ENDPOINT << "Stream is already closed: " << stream_id;
    return;
  }
  QuicStream* stream = it->second.get();
  StreamType type = stream->type();

  if (stream->IsWaitingForAcks()) {
    zombie_streams_[stream_id] = std::move(it->second);
  } else {
    // Clean up the stream since it is no longer waiting for acks.
    streams_waiting_for_acks_.erase(stream_id);
    closed_streams_.push_back(std::move(it->second));
    // Do not retransmit data of a closed stream.
    streams_with_pending_retransmission_.erase(stream_id);
    if (!closed_streams_clean_up_alarm_->IsSet()) {
      closed_streams_clean_up_alarm_->Set(
          connection_->clock()->ApproximateNow());
    }
  }

  if (!stream->HasReceivedFinalOffset()) {
    // If we haven't received a FIN or RST for this stream, we need to keep
    // track of the how many bytes the stream's flow controller believes it has
    // received, for accurate connection level flow control accounting.
    // If this is an outgoing stream, it is technically open from peer's
    // perspective. Do not inform stream Id manager yet.
    DCHECK(!stream->was_draining());
    InsertLocallyClosedStreamsHighestOffset(
        stream_id, stream->flow_controller()->highest_received_byte_offset());
    stream_map_.erase(it);
    return;
  }

  const bool stream_was_draining = stream->was_draining();
  QUIC_DVLOG_IF(1, stream_was_draining)
      << ENDPOINT << "Stream " << stream_id << " was draining";
  stream_map_.erase(it);
  if (stream_was_draining) {
    QUIC_BUG_IF(num_draining_streams_ == 0);
    --num_draining_streams_;
    if (!IsIncomingStream(stream_id)) {
      QUIC_BUG_IF(num_outgoing_draining_streams_ == 0);
      --num_outgoing_draining_streams_;
    }
    // Stream Id manager has been informed with draining streams.
    return;
  }
  if (!VersionHasIetfQuicFrames(transport_version())) {
    stream_id_manager_.OnStreamClosed(
        /*is_incoming=*/IsIncomingStream(stream_id));
  }
  if (!connection_->connected()) {
    return;
  }
  if (IsIncomingStream(stream_id)) {
    // Stream Id manager is only interested in peer initiated stream IDs.
    if (VersionHasIetfQuicFrames(transport_version())) {
      v99_streamid_manager_.OnStreamClosed(stream_id);
    }
    return;
  }
  if (!VersionHasIetfQuicFrames(transport_version())) {
    OnCanCreateNewOutgoingStream(type != BIDIRECTIONAL);
  }
}

void QuicSession::ClosePendingStream(QuicStreamId stream_id) {
  QUIC_DVLOG(1) << ENDPOINT << "Closing stream " << stream_id;
  DCHECK(VersionHasIetfQuicFrames(transport_version()));
  pending_stream_map_.erase(stream_id);
  if (connection_->connected()) {
    v99_streamid_manager_.OnStreamClosed(stream_id);
  }
}

void QuicSession::OnFinalByteOffsetReceived(
    QuicStreamId stream_id,
    QuicStreamOffset final_byte_offset) {
  auto it = locally_closed_streams_highest_offset_.find(stream_id);
  if (it == locally_closed_streams_highest_offset_.end()) {
    return;
  }

  QUIC_DVLOG(1) << ENDPOINT << "Received final byte offset "
                << final_byte_offset << " for stream " << stream_id;
  QuicByteCount offset_diff = final_byte_offset - it->second;
  if (flow_controller_.UpdateHighestReceivedOffset(
          flow_controller_.highest_received_byte_offset() + offset_diff)) {
    // If the final offset violates flow control, close the connection now.
    if (flow_controller_.FlowControlViolation()) {
      connection_->CloseConnection(
          QUIC_FLOW_CONTROL_RECEIVED_TOO_MUCH_DATA,
          "Connection level flow control violation",
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }
  }

  flow_controller_.AddBytesConsumed(offset_diff);
  locally_closed_streams_highest_offset_.erase(it);
  if (!VersionHasIetfQuicFrames(transport_version())) {
    stream_id_manager_.OnStreamClosed(
        /*is_incoming=*/IsIncomingStream(stream_id));
  }
  if (IsIncomingStream(stream_id)) {
    if (VersionHasIetfQuicFrames(transport_version())) {
      v99_streamid_manager_.OnStreamClosed(stream_id);
    }
  } else if (!VersionHasIetfQuicFrames(transport_version())) {
    OnCanCreateNewOutgoingStream(false);
  }
}

bool QuicSession::IsEncryptionEstablished() const {
  if (GetCryptoStream() == nullptr) {
    return false;
  }
  return GetCryptoStream()->encryption_established();
}

bool QuicSession::OneRttKeysAvailable() const {
  if (GetCryptoStream() == nullptr) {
    return false;
  }
  return GetCryptoStream()->one_rtt_keys_available();
}

void QuicSession::OnConfigNegotiated() {
  // In versions with TLS, the configs will be set twice if 0-RTT is available.
  // In the second config setting, 1-RTT keys are guaranteed to be available.
  if (GetQuicReloadableFlag(quic_enable_zero_rtt_for_tls) &&
      version().UsesTls() && is_configured_ &&
      connection_->encryption_level() != ENCRYPTION_FORWARD_SECURE) {
    QUIC_BUG
        << ENDPOINT
        << "1-RTT keys missing when config is negotiated for the second time.";
    connection_->CloseConnection(
        QUIC_INTERNAL_ERROR,
        "1-RTT keys missing when config is negotiated for the second time.",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  QUIC_DVLOG(1) << ENDPOINT << "OnConfigNegotiated";
  connection_->SetFromConfig(config_);

  if (VersionHasIetfQuicFrames(transport_version())) {
    uint32_t max_streams = 0;
    if (config_.HasReceivedMaxBidirectionalStreams()) {
      max_streams = config_.ReceivedMaxBidirectionalStreams();
    }
    if (was_zero_rtt_rejected_ &&
        max_streams <
            v99_streamid_manager_.outgoing_bidirectional_stream_count()) {
      connection_->CloseConnection(
          QUIC_ZERO_RTT_UNRETRANSMITTABLE,
          quiche::QuicheStrCat(
              "Server rejected 0-RTT, aborting because new bidirectional "
              "initial stream limit ",
              max_streams, " is less than current open streams: ",
              v99_streamid_manager_.outgoing_bidirectional_stream_count()),
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }
    QUIC_DVLOG(1) << ENDPOINT
                  << "Setting Bidirectional outgoing_max_streams_ to "
                  << max_streams;
    if (perspective_ == Perspective::IS_CLIENT &&
        max_streams <
            v99_streamid_manager_.max_outgoing_bidirectional_streams()) {
      connection_->CloseConnection(
          was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
                                 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
          quiche::QuicheStrCat(
              was_zero_rtt_rejected_
                  ? "Server rejected 0-RTT, aborting because "
                  : "",
              "new bidirectional limit ", max_streams,
              " decreases the current limit: ",
              v99_streamid_manager_.max_outgoing_bidirectional_streams()),
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }
    if (v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
            max_streams)) {
      OnCanCreateNewOutgoingStream(/*unidirectional = */ false);
    }

    max_streams = 0;
    if (config_.HasReceivedMaxUnidirectionalStreams()) {
      max_streams = config_.ReceivedMaxUnidirectionalStreams();
    }

    if (was_zero_rtt_rejected_ &&
        max_streams <
            v99_streamid_manager_.outgoing_unidirectional_stream_count()) {
      connection_->CloseConnection(
          QUIC_ZERO_RTT_UNRETRANSMITTABLE,
          quiche::QuicheStrCat(
              "Server rejected 0-RTT, aborting because new unidirectional "
              "initial stream limit ",
              max_streams, " is less than current open streams: ",
              v99_streamid_manager_.outgoing_unidirectional_stream_count()),
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }

    if (max_streams <
        v99_streamid_manager_.max_outgoing_unidirectional_streams()) {
      connection_->CloseConnection(
          was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
                                 : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
          quiche::QuicheStrCat(
              was_zero_rtt_rejected_
                  ? "Server rejected 0-RTT, aborting because "
                  : "",
              "new unidirectional limit ", max_streams,
              " decreases the current limit: ",
              v99_streamid_manager_.max_outgoing_unidirectional_streams()),
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }
    QUIC_DVLOG(1) << ENDPOINT
                  << "Setting Unidirectional outgoing_max_streams_ to "
                  << max_streams;
    if (v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
            max_streams)) {
      OnCanCreateNewOutgoingStream(/*unidirectional = */ true);
    }
  } else {
    uint32_t max_streams = 0;
    if (config_.HasReceivedMaxBidirectionalStreams()) {
      max_streams = config_.ReceivedMaxBidirectionalStreams();
    }
    QUIC_DVLOG(1) << ENDPOINT << "Setting max_open_outgoing_streams_ to "
                  << max_streams;
    if (was_zero_rtt_rejected_ &&
        max_streams < stream_id_manager_.num_open_outgoing_streams()) {
      connection_->CloseConnection(
          QUIC_INTERNAL_ERROR,
          quiche::QuicheStrCat(
              "Server rejected 0-RTT, aborting because new stream limit ",
              max_streams, " is less than current open streams: ",
              stream_id_manager_.num_open_outgoing_streams()),
          ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
      return;
    }
    stream_id_manager_.set_max_open_outgoing_streams(max_streams);
  }

  if (perspective() == Perspective::IS_SERVER) {
    if (config_.HasReceivedConnectionOptions()) {
      // The following variations change the initial receive flow control
      // window sizes.
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW6)) {
        AdjustInitialFlowControlWindows(64 * 1024);
      }
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW7)) {
        AdjustInitialFlowControlWindows(128 * 1024);
      }
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW8)) {
        AdjustInitialFlowControlWindows(256 * 1024);
      }
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFW9)) {
        AdjustInitialFlowControlWindows(512 * 1024);
      }
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kIFWA)) {
        AdjustInitialFlowControlWindows(1024 * 1024);
      }
      if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kH2PR) &&
          !VersionHasIetfQuicFrames(transport_version())) {
        // Enable HTTP2 (tree-style) priority write scheduler.
        use_http2_priority_write_scheduler_ =
            write_blocked_streams_.SwitchWriteScheduler(
                spdy::WriteSchedulerType::HTTP2, transport_version());
      } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kFIFO)) {
        // Enable FIFO write scheduler.
        write_blocked_streams_.SwitchWriteScheduler(
            spdy::WriteSchedulerType::FIFO, transport_version());
      } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kLIFO)) {
        // Enable LIFO write scheduler.
        write_blocked_streams_.SwitchWriteScheduler(
            spdy::WriteSchedulerType::LIFO, transport_version());
      } else if (ContainsQuicTag(config_.ReceivedConnectionOptions(), kRRWS) &&
                 write_blocked_streams_.scheduler_type() ==
                     spdy::WriteSchedulerType::SPDY) {
        enable_round_robin_scheduling_ = true;
      }
    }

    config_.SetStatelessResetTokenToSend(GetStatelessResetToken());
  }

  if (VersionHasIetfQuicFrames(transport_version())) {
    v99_streamid_manager_.SetMaxOpenIncomingBidirectionalStreams(
        config_.GetMaxBidirectionalStreamsToSend());
    v99_streamid_manager_.SetMaxOpenIncomingUnidirectionalStreams(
        config_.GetMaxUnidirectionalStreamsToSend());
  } else {
    // A small number of additional incoming streams beyond the limit should be
    // allowed. This helps avoid early connection termination when FIN/RSTs for
    // old streams are lost or arrive out of order.
    // Use a minimum number of additional streams, or a percentage increase,
    // whichever is larger.
    uint32_t max_incoming_streams_to_send =
        config_.GetMaxBidirectionalStreamsToSend();
    uint32_t max_incoming_streams =
        std::max(max_incoming_streams_to_send + kMaxStreamsMinimumIncrement,
                 static_cast<uint32_t>(max_incoming_streams_to_send *
                                       kMaxStreamsMultiplier));
    stream_id_manager_.set_max_open_incoming_streams(max_incoming_streams);
  }

  if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3) {
    // When using IETF-style TLS transport parameters, inform existing streams
    // of new flow-control limits.
    if (config_.HasReceivedInitialMaxStreamDataBytesOutgoingBidirectional()) {
      OnNewStreamOutgoingBidirectionalFlowControlWindow(
          config_.ReceivedInitialMaxStreamDataBytesOutgoingBidirectional());
    }
    if (config_.HasReceivedInitialMaxStreamDataBytesIncomingBidirectional()) {
      OnNewStreamIncomingBidirectionalFlowControlWindow(
          config_.ReceivedInitialMaxStreamDataBytesIncomingBidirectional());
    }
    if (config_.HasReceivedInitialMaxStreamDataBytesUnidirectional()) {
      OnNewStreamUnidirectionalFlowControlWindow(
          config_.ReceivedInitialMaxStreamDataBytesUnidirectional());
    }
  } else {  // The version uses Google QUIC Crypto.
    if (config_.HasReceivedInitialStreamFlowControlWindowBytes()) {
      // Streams which were created before the SHLO was received (0-RTT
      // requests) are now informed of the peer's initial flow control window.
      OnNewStreamFlowControlWindow(
          config_.ReceivedInitialStreamFlowControlWindowBytes());
    }
  }

  if (config_.HasReceivedInitialSessionFlowControlWindowBytes()) {
    OnNewSessionFlowControlWindow(
        config_.ReceivedInitialSessionFlowControlWindowBytes());
  }

  is_configured_ = true;
  connection()->OnConfigNegotiated();

  // Ask flow controllers to try again since the config could have unblocked us.
  // Or if this session is configured on TLS enabled QUIC versions,
  // attempt to retransmit 0-RTT data if there's any.
  if (connection_->version().AllowsLowFlowControlLimits() ||
      (GetQuicReloadableFlag(quic_enable_zero_rtt_for_tls) &&
       version().UsesTls())) {
    OnCanWrite();
  }
}

void QuicSession::AdjustInitialFlowControlWindows(size_t stream_window) {
  const float session_window_multiplier =
      config_.GetInitialStreamFlowControlWindowToSend()
          ? static_cast<float>(
                config_.GetInitialSessionFlowControlWindowToSend()) /
                config_.GetInitialStreamFlowControlWindowToSend()
          : 1.5;

  QUIC_DVLOG(1) << ENDPOINT << "Set stream receive window to " << stream_window;
  config_.SetInitialStreamFlowControlWindowToSend(stream_window);

  size_t session_window = session_window_multiplier * stream_window;
  QUIC_DVLOG(1) << ENDPOINT << "Set session receive window to "
                << session_window;
  config_.SetInitialSessionFlowControlWindowToSend(session_window);
  flow_controller_.UpdateReceiveWindowSize(session_window);
  // Inform all existing streams about the new window.
  for (auto const& kv : stream_map_) {
    kv.second->flow_controller()->UpdateReceiveWindowSize(stream_window);
  }
  if (!QuicVersionUsesCryptoFrames(transport_version())) {
    GetMutableCryptoStream()->flow_controller()->UpdateReceiveWindowSize(
        stream_window);
  }
}

void QuicSession::HandleFrameOnNonexistentOutgoingStream(
    QuicStreamId stream_id) {
  DCHECK(!IsClosedStream(stream_id));
  // Received a frame for a locally-created stream that is not currently
  // active. This is an error.
  if (VersionHasIetfQuicFrames(transport_version())) {
    connection()->CloseConnection(
        QUIC_HTTP_STREAM_WRONG_DIRECTION, "Data for nonexistent stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }
  connection()->CloseConnection(
      QUIC_INVALID_STREAM_ID, "Data for nonexistent stream",
      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
}

void QuicSession::HandleRstOnValidNonexistentStream(
    const QuicRstStreamFrame& frame) {
  // If the stream is neither originally in active streams nor created in
  // GetOrCreateStream(), it could be a closed stream in which case its
  // final received byte offset need to be updated.
  if (IsClosedStream(frame.stream_id)) {
    // The RST frame contains the final byte offset for the stream: we can now
    // update the connection level flow controller if needed.
    OnFinalByteOffsetReceived(frame.stream_id, frame.byte_offset);
  }
}

void QuicSession::OnNewStreamFlowControlWindow(QuicStreamOffset new_window) {
  DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_QUIC_CRYPTO);
  QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamFlowControlWindow " << new_window;
  if (new_window < kMinimumFlowControlSendWindow &&
      !connection_->version().AllowsLowFlowControlLimits()) {
    QUIC_LOG_FIRST_N(ERROR, 1)
        << "Peer sent us an invalid stream flow control send window: "
        << new_window << ", below minimum: " << kMinimumFlowControlSendWindow;
    connection_->CloseConnection(
        QUIC_FLOW_CONTROL_INVALID_WINDOW, "New stream window too low",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  // Inform all existing streams about the new window.
  for (auto const& kv : stream_map_) {
    QUIC_DVLOG(1) << ENDPOINT << "Informing stream " << kv.first
                  << " of new stream flow control window " << new_window;
    if (!kv.second->ConfigSendWindowOffset(new_window)) {
      return;
    }
  }
  if (!QuicVersionUsesCryptoFrames(transport_version())) {
    QUIC_DVLOG(1)
        << ENDPOINT
        << "Informing crypto stream of new stream flow control window "
        << new_window;
    GetMutableCryptoStream()->ConfigSendWindowOffset(new_window);
  }
}

void QuicSession::OnNewStreamUnidirectionalFlowControlWindow(
    QuicStreamOffset new_window) {
  DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
  QUIC_DVLOG(1) << ENDPOINT << "OnNewStreamUnidirectionalFlowControlWindow "
                << new_window;
  // Inform all existing outgoing unidirectional streams about the new window.
  for (auto const& kv : stream_map_) {
    const QuicStreamId id = kv.first;
    if (fix_gquic_stream_type_ && !version().HasIetfQuicFrames()) {
      QUIC_RELOADABLE_FLAG_COUNT_N(quic_fix_gquic_stream_type, 1, 3);
      if (kv.second->type() == BIDIRECTIONAL) {
        continue;
      }
    } else {
      if (QuicUtils::IsBidirectionalStreamId(id, version())) {
        continue;
      }
    }
    if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
                                       perspective())) {
      continue;
    }
    QUIC_DVLOG(1) << ENDPOINT << "Informing unidirectional stream " << id
                  << " of new stream flow control window " << new_window;
    if (!ValidateStreamFlowControlLimit(new_window, kv.second.get())) {
      return;
    }
    if (!kv.second->ConfigSendWindowOffset(new_window)) {
      return;
    }
  }
}

void QuicSession::OnNewStreamOutgoingBidirectionalFlowControlWindow(
    QuicStreamOffset new_window) {
  DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
  QUIC_DVLOG(1) << ENDPOINT
                << "OnNewStreamOutgoingBidirectionalFlowControlWindow "
                << new_window;
  // Inform all existing outgoing bidirectional streams about the new window.
  for (auto const& kv : stream_map_) {
    const QuicStreamId id = kv.first;
    if (fix_gquic_stream_type_ && !version().HasIetfQuicFrames()) {
      QUIC_RELOADABLE_FLAG_COUNT_N(quic_fix_gquic_stream_type, 2, 3);
      if (kv.second->type() != BIDIRECTIONAL) {
        continue;
      }
    } else {
      if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
        continue;
      }
    }
    if (!QuicUtils::IsOutgoingStreamId(connection_->version(), id,
                                       perspective())) {
      continue;
    }
    QUIC_DVLOG(1) << ENDPOINT << "Informing outgoing bidirectional stream "
                  << id << " of new stream flow control window " << new_window;
    if (!ValidateStreamFlowControlLimit(new_window, kv.second.get())) {
      return;
    }
    if (!kv.second->ConfigSendWindowOffset(new_window)) {
      return;
    }
  }
}

void QuicSession::OnNewStreamIncomingBidirectionalFlowControlWindow(
    QuicStreamOffset new_window) {
  DCHECK_EQ(connection_->version().handshake_protocol, PROTOCOL_TLS1_3);
  QUIC_DVLOG(1) << ENDPOINT
                << "OnNewStreamIncomingBidirectionalFlowControlWindow "
                << new_window;
  // Inform all existing incoming bidirectional streams about the new window.
  for (auto const& kv : stream_map_) {
    const QuicStreamId id = kv.first;
    if (fix_gquic_stream_type_ && !version().HasIetfQuicFrames()) {
      QUIC_RELOADABLE_FLAG_COUNT_N(quic_fix_gquic_stream_type, 3, 3);
      if (kv.second->type() != BIDIRECTIONAL) {
        continue;
      }
    } else {
      if (!QuicUtils::IsBidirectionalStreamId(id, version())) {
        continue;
      }
    }
    if (QuicUtils::IsOutgoingStreamId(connection_->version(), id,
                                      perspective())) {
      continue;
    }
    QUIC_DVLOG(1) << ENDPOINT << "Informing incoming bidirectional stream "
                  << id << " of new stream flow control window " << new_window;
    if (!ValidateStreamFlowControlLimit(new_window, kv.second.get())) {
      return;
    }
    if (!kv.second->ConfigSendWindowOffset(new_window)) {
      return;
    }
  }
}

bool QuicSession::ValidateStreamFlowControlLimit(QuicStreamOffset new_window,
                                                 const QuicStream* stream) {
  if (was_zero_rtt_rejected_ &&
      new_window < stream->flow_controller()->bytes_sent()) {
    QUIC_BUG_IF(perspective() == Perspective::IS_SERVER)
        << "Server should never receive configs twice.";
    connection_->CloseConnection(
        QUIC_ZERO_RTT_UNRETRANSMITTABLE,
        quiche::QuicheStrCat(
            "Server rejected 0-RTT, aborting because new stream max data ",
            new_window, " for stream ", stream->id(),
            " is less than currently used: ",
            stream->flow_controller()->bytes_sent()),
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return false;
  }

  if (version().AllowsLowFlowControlLimits() &&
      new_window < stream->flow_controller()->send_window_offset()) {
    QUIC_BUG_IF(perspective() == Perspective::IS_SERVER)
        << "Server should never receive configs twice.";
    connection_->CloseConnection(
        was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
                               : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
        quiche::QuicheStrCat(
            was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
                                   : "",
            "new stream max data ", new_window, " decreases current limit: ",
            stream->flow_controller()->send_window_offset()),
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return false;
  }
  return true;
}

void QuicSession::OnNewSessionFlowControlWindow(QuicStreamOffset new_window) {
  QUIC_DVLOG(1) << ENDPOINT << "OnNewSessionFlowControlWindow " << new_window;

  if (was_zero_rtt_rejected_ && new_window < flow_controller_.bytes_sent()) {
    std::string error_details = quiche::QuicheStrCat(
        "Server rejected 0-RTT. Aborting because the client received session "
        "flow control send window: ",
        new_window,
        ", which is below currently used: ", flow_controller_.bytes_sent());
    QUIC_LOG(ERROR) << error_details;
    connection_->CloseConnection(
        QUIC_ZERO_RTT_UNRETRANSMITTABLE, error_details,
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }
  if (!connection_->version().AllowsLowFlowControlLimits() &&
      new_window < kMinimumFlowControlSendWindow) {
    std::string error_details = quiche::QuicheStrCat(
        "Peer sent us an invalid session flow control send window: ",
        new_window, ", below minimum: ", kMinimumFlowControlSendWindow);
    QUIC_LOG_FIRST_N(ERROR, 1) << error_details;
    connection_->CloseConnection(
        QUIC_FLOW_CONTROL_INVALID_WINDOW, error_details,
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }
  if (perspective_ == Perspective::IS_CLIENT &&
      new_window < flow_controller_.send_window_offset()) {
    // The client receives a lower limit than remembered, violating
    // https://tools.ietf.org/html/draft-ietf-quic-transport-27#section-7.3.1
    std::string error_details = quiche::QuicheStrCat(
        was_zero_rtt_rejected_ ? "Server rejected 0-RTT, aborting because "
                               : "",
        "new session max data ", new_window,
        " decreases current limit: ", flow_controller_.send_window_offset());
    QUIC_LOG(ERROR) << error_details;
    connection_->CloseConnection(
        was_zero_rtt_rejected_ ? QUIC_ZERO_RTT_REJECTION_LIMIT_REDUCED
                               : QUIC_ZERO_RTT_RESUMPTION_LIMIT_REDUCED,
        error_details, ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }

  flow_controller_.UpdateSendWindowOffset(new_window);
}

bool QuicSession::OnNewDecryptionKeyAvailable(
    EncryptionLevel level,
    std::unique_ptr<QuicDecrypter> decrypter,
    bool set_alternative_decrypter,
    bool latch_once_used) {
  if (connection_->version().handshake_protocol == PROTOCOL_TLS1_3 &&
      !connection()->framer().HasEncrypterOfEncryptionLevel(
          QuicUtils::GetEncryptionLevel(
              QuicUtils::GetPacketNumberSpace(level)))) {
    // This should never happen because connection should never decrypt a packet
    // while an ACK for it cannot be encrypted.
    return false;
  }
  if (connection()->version().KnowsWhichDecrypterToUse()) {
    connection()->InstallDecrypter(level, std::move(decrypter));
    return true;
  }
  if (set_alternative_decrypter) {
    connection()->SetAlternativeDecrypter(level, std::move(decrypter),
                                          latch_once_used);
    return true;
  }
  connection()->SetDecrypter(level, std::move(decrypter));
  return true;
}

void QuicSession::OnNewEncryptionKeyAvailable(
    EncryptionLevel level,
    std::unique_ptr<QuicEncrypter> encrypter) {
  connection()->SetEncrypter(level, std::move(encrypter));
  if (connection_->version().handshake_protocol != PROTOCOL_TLS1_3) {
    return;
  }

  QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
  connection()->SetDefaultEncryptionLevel(level);
}

void QuicSession::SetDefaultEncryptionLevel(EncryptionLevel level) {
  DCHECK_EQ(PROTOCOL_QUIC_CRYPTO, connection_->version().handshake_protocol);
  QUIC_DVLOG(1) << ENDPOINT << "Set default encryption level to " << level;
  connection()->SetDefaultEncryptionLevel(level);

  switch (level) {
    case ENCRYPTION_INITIAL:
      break;
    case ENCRYPTION_ZERO_RTT:
      if (perspective() == Perspective::IS_CLIENT) {
        // Retransmit old 0-RTT data (if any) with the new 0-RTT keys, since
        // they can't be decrypted by the server.
        connection_->MarkZeroRttPacketsForRetransmission();
        // Given any streams blocked by encryption a chance to write.
        OnCanWrite();
      }
      break;
    case ENCRYPTION_HANDSHAKE:
      break;
    case ENCRYPTION_FORWARD_SECURE:
      QUIC_BUG_IF(!config_.negotiated())
          << ENDPOINT << "Handshake confirmed without parameter negotiation.";
      break;
    default:
      QUIC_BUG << "Unknown encryption level: " << level;
  }
}

void QuicSession::OnOneRttKeysAvailable() {
  DCHECK_EQ(PROTOCOL_TLS1_3, connection_->version().handshake_protocol);
  QUIC_BUG_IF(!GetCryptoStream()->crypto_negotiated_params().cipher_suite)
      << ENDPOINT << "Handshake completes without cipher suite negotiation.";
  QUIC_BUG_IF(!config_.negotiated())
      << ENDPOINT << "Handshake completes without parameter negotiation.";
  if ((connection()->version().HasHandshakeDone() ||
       (GetQuicReloadableFlag(quic_support_handshake_done_in_t050) &&
        config_.PeerSupportsHandshakeDone())) &&
      perspective_ == Perspective::IS_SERVER) {
    if (!connection()->version().HasHandshakeDone()) {
      QUIC_RELOADABLE_FLAG_COUNT(quic_support_handshake_done_in_t050);
    }
    // Server sends HANDSHAKE_DONE to signal confirmation of the handshake
    // to the client.
    control_frame_manager_.WriteOrBufferHandshakeDone();
  }
}

void QuicSession::DiscardOldDecryptionKey(EncryptionLevel level) {
  if (!connection()->version().KnowsWhichDecrypterToUse()) {
    return;
  }
  connection()->RemoveDecrypter(level);
}

void QuicSession::DiscardOldEncryptionKey(EncryptionLevel level) {
  QUIC_DLOG(INFO) << ENDPOINT << "Discarding " << level << " keys";
  if (connection()->version().handshake_protocol == PROTOCOL_TLS1_3) {
    connection()->RemoveEncrypter(level);
  }
  switch (level) {
    case ENCRYPTION_INITIAL:
      NeuterUnencryptedData();
      break;
    case ENCRYPTION_HANDSHAKE:
      NeuterHandshakeData();
      break;
    case ENCRYPTION_ZERO_RTT:
      break;
    case ENCRYPTION_FORWARD_SECURE:
      QUIC_BUG << ENDPOINT << "Discarding 1-RTT keys is not allowed";
      break;
    default:
      QUIC_BUG << ENDPOINT
               << "Cannot discard keys for unknown encryption level: " << level;
  }
}

void QuicSession::NeuterHandshakeData() {
  connection()->OnHandshakeComplete();
}

void QuicSession::OnZeroRttRejected() {
  was_zero_rtt_rejected_ = true;
  connection_->MarkZeroRttPacketsForRetransmission();
  if (connection_->encryption_level() == ENCRYPTION_FORWARD_SECURE) {
    QUIC_BUG << "1-RTT keys already available when 0-RTT is rejected.";
    connection_->CloseConnection(
        QUIC_INTERNAL_ERROR,
        "1-RTT keys already available when 0-RTT is rejected.",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
  }
}

bool QuicSession::FillTransportParameters(TransportParameters* params) {
  if (version().AuthenticatesHandshakeConnectionIds()) {
    if (perspective() == Perspective::IS_SERVER) {
      config_.SetOriginalConnectionIdToSend(
          connection_->GetOriginalDestinationConnectionId());
      config_.SetInitialSourceConnectionIdToSend(connection_->connection_id());
    } else {
      config_.SetInitialSourceConnectionIdToSend(
          connection_->client_connection_id());
    }
  }
  return config_.FillTransportParameters(params);
}

QuicErrorCode QuicSession::ProcessTransportParameters(
    const TransportParameters& params,
    bool is_resumption,
    std::string* error_details) {
  HelloType hello_type;
  if (perspective_ == Perspective::IS_CLIENT) {
    hello_type = SERVER;
  } else {
    hello_type = CLIENT;
  }
  return config_.ProcessTransportParameters(params, hello_type, is_resumption,
                                            error_details);
}

void QuicSession::OnCryptoHandshakeMessageSent(
    const CryptoHandshakeMessage& /*message*/) {}

void QuicSession::OnCryptoHandshakeMessageReceived(
    const CryptoHandshakeMessage& /*message*/) {}

void QuicSession::RegisterStreamPriority(
    QuicStreamId id,
    bool is_static,
    const spdy::SpdyStreamPrecedence& precedence) {
  if (enable_round_robin_scheduling_) {
    // Ignore provided precedence, instead, put all streams at the same priority
    // bucket.
    write_blocked_streams()->RegisterStream(
        id, is_static, spdy::SpdyStreamPrecedence(spdy::kV3LowestPriority));
    return;
  }
  write_blocked_streams()->RegisterStream(id, is_static, precedence);
}

void QuicSession::UnregisterStreamPriority(QuicStreamId id, bool is_static) {
  write_blocked_streams()->UnregisterStream(id, is_static);
}

void QuicSession::UpdateStreamPriority(
    QuicStreamId id,
    const spdy::SpdyStreamPrecedence& new_precedence) {
  if (enable_round_robin_scheduling_) {
    // Ignore updated precedence.
    return;
  }
  write_blocked_streams()->UpdateStreamPriority(id, new_precedence);
}

QuicConfig* QuicSession::config() {
  return &config_;
}

void QuicSession::ActivateStream(std::unique_ptr<QuicStream> stream) {
  QuicStreamId stream_id = stream->id();
  bool is_static = stream->is_static();
  QUIC_DVLOG(1) << ENDPOINT << "num_streams: " << stream_map_.size()
                << ". activating stream " << stream_id;
  DCHECK(!QuicContainsKey(stream_map_, stream_id));
  stream_map_[stream_id] = std::move(stream);
  if (is_static) {
    ++num_static_streams_;
    return;
  }
  if (!VersionHasIetfQuicFrames(transport_version())) {
    // Do not inform stream ID manager of static streams.
    stream_id_manager_.ActivateStream(
        /*is_incoming=*/IsIncomingStream(stream_id));
  }
}

QuicStreamId QuicSession::GetNextOutgoingBidirectionalStreamId() {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetNextOutgoingBidirectionalStreamId();
  }
  return stream_id_manager_.GetNextOutgoingStreamId();
}

QuicStreamId QuicSession::GetNextOutgoingUnidirectionalStreamId() {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetNextOutgoingUnidirectionalStreamId();
  }
  return stream_id_manager_.GetNextOutgoingStreamId();
}

bool QuicSession::CanOpenNextOutgoingBidirectionalStream() {
  if (!VersionHasIetfQuicFrames(transport_version())) {
    return stream_id_manager_.CanOpenNextOutgoingStream();
  }
  if (v99_streamid_manager_.CanOpenNextOutgoingBidirectionalStream()) {
    return true;
  }
  if (is_configured_) {
    // Send STREAM_BLOCKED after config negotiated.
    control_frame_manager_.WriteOrBufferStreamsBlocked(
        v99_streamid_manager_.max_outgoing_bidirectional_streams(),
        /*unidirectional=*/false);
  }
  return false;
}

bool QuicSession::CanOpenNextOutgoingUnidirectionalStream() {
  if (!VersionHasIetfQuicFrames(transport_version())) {
    return stream_id_manager_.CanOpenNextOutgoingStream();
  }
  if (v99_streamid_manager_.CanOpenNextOutgoingUnidirectionalStream()) {
    return true;
  }
  if (is_configured_) {
    // Send STREAM_BLOCKED after config negotiated.
    control_frame_manager_.WriteOrBufferStreamsBlocked(
        v99_streamid_manager_.max_outgoing_unidirectional_streams(),
        /*unidirectional=*/true);
  }
  return false;
}

QuicStreamCount QuicSession::GetAdvertisedMaxIncomingBidirectionalStreams()
    const {
  DCHECK(VersionHasIetfQuicFrames(transport_version()));
  return v99_streamid_manager_.advertised_max_incoming_bidirectional_streams();
}

QuicStream* QuicSession::GetOrCreateStream(const QuicStreamId stream_id) {
  DCHECK(!QuicContainsKey(pending_stream_map_, stream_id));
  if (QuicUtils::IsCryptoStreamId(transport_version(), stream_id)) {
    return GetMutableCryptoStream();
  }

  StreamMap::iterator it = stream_map_.find(stream_id);
  if (it != stream_map_.end()) {
    return it->second.get();
  }

  if (IsClosedStream(stream_id)) {
    return nullptr;
  }

  if (!IsIncomingStream(stream_id)) {
    HandleFrameOnNonexistentOutgoingStream(stream_id);
    return nullptr;
  }

  // TODO(fkastenholz): If we are creating a new stream and we have
  // sent a goaway, we should ignore the stream creation. Need to
  // add code to A) test if goaway was sent ("if (goaway_sent_)") and
  // B) reject stream creation ("return nullptr")

  if (!MaybeIncreaseLargestPeerStreamId(stream_id)) {
    return nullptr;
  }

  if (!VersionHasIetfQuicFrames(transport_version()) &&
      !stream_id_manager_.CanOpenIncomingStream()) {
    // Refuse to open the stream.
    ResetStream(stream_id, QUIC_REFUSED_STREAM);
    return nullptr;
  }

  return CreateIncomingStream(stream_id);
}

void QuicSession::StreamDraining(QuicStreamId stream_id, bool unidirectional) {
  DCHECK(QuicContainsKey(stream_map_, stream_id));
  QUIC_DVLOG(1) << ENDPOINT << "Stream " << stream_id << " is draining";
  if (VersionHasIetfQuicFrames(transport_version())) {
    v99_streamid_manager_.OnStreamClosed(stream_id);
  } else {
    stream_id_manager_.OnStreamClosed(
        /*is_incoming=*/IsIncomingStream(stream_id));
  }
  ++num_draining_streams_;
  if (!IsIncomingStream(stream_id)) {
    ++num_outgoing_draining_streams_;
    OnCanCreateNewOutgoingStream(unidirectional);
  }
}

bool QuicSession::MaybeIncreaseLargestPeerStreamId(
    const QuicStreamId stream_id) {
  if (VersionHasIetfQuicFrames(transport_version())) {
    std::string error_details;
    if (v99_streamid_manager_.MaybeIncreaseLargestPeerStreamId(
            stream_id, &error_details)) {
      return true;
    }
    connection()->CloseConnection(
        QUIC_INVALID_STREAM_ID, error_details,
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return false;
  }
  if (!stream_id_manager_.MaybeIncreaseLargestPeerStreamId(stream_id)) {
    connection()->CloseConnection(
        QUIC_TOO_MANY_AVAILABLE_STREAMS,
        quiche::QuicheStrCat(stream_id, " exceeds available streams ",
                             stream_id_manager_.MaxAvailableStreams()),
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return false;
  }
  return true;
}

bool QuicSession::ShouldYield(QuicStreamId stream_id) {
  if (stream_id == currently_writing_stream_id_) {
    return false;
  }
  return write_blocked_streams()->ShouldYield(stream_id);
}

PendingStream* QuicSession::GetOrCreatePendingStream(QuicStreamId stream_id) {
  auto it = pending_stream_map_.find(stream_id);
  if (it != pending_stream_map_.end()) {
    return it->second.get();
  }

  if (IsClosedStream(stream_id) ||
      !MaybeIncreaseLargestPeerStreamId(stream_id)) {
    return nullptr;
  }

  auto pending = std::make_unique<PendingStream>(stream_id, this);
  PendingStream* unowned_pending = pending.get();
  pending_stream_map_[stream_id] = std::move(pending);
  return unowned_pending;
}

void QuicSession::set_largest_peer_created_stream_id(
    QuicStreamId largest_peer_created_stream_id) {
  DCHECK(!VersionHasIetfQuicFrames(transport_version()));
  stream_id_manager_.set_largest_peer_created_stream_id(
      largest_peer_created_stream_id);
}

QuicStreamId QuicSession::GetLargestPeerCreatedStreamId(
    bool unidirectional) const {
  // This method is only used in IETF QUIC.
  DCHECK(VersionHasIetfQuicFrames(transport_version()));
  return v99_streamid_manager_.GetLargestPeerCreatedStreamId(unidirectional);
}

void QuicSession::DeleteConnection() {
  if (connection_) {
    delete connection_;
    connection_ = nullptr;
  }
}

bool QuicSession::MaybeSetStreamPriority(
    QuicStreamId stream_id,
    const spdy::SpdyStreamPrecedence& precedence) {
  auto active_stream = stream_map_.find(stream_id);
  if (active_stream != stream_map_.end()) {
    active_stream->second->SetPriority(precedence);
    return true;
  }

  return false;
}

bool QuicSession::IsClosedStream(QuicStreamId id) {
  DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
  if (IsOpenStream(id)) {
    // Stream is active
    return false;
  }

  if (VersionHasIetfQuicFrames(transport_version())) {
    return !v99_streamid_manager_.IsAvailableStream(id);
  }

  return !stream_id_manager_.IsAvailableStream(id);
}

bool QuicSession::IsOpenStream(QuicStreamId id) {
  DCHECK_NE(QuicUtils::GetInvalidStreamId(transport_version()), id);
  if (QuicContainsKey(stream_map_, id) ||
      QuicContainsKey(pending_stream_map_, id) ||
      QuicUtils::IsCryptoStreamId(transport_version(), id)) {
    // Stream is active
    return true;
  }
  return false;
}

bool QuicSession::IsStaticStream(QuicStreamId id) const {
  auto it = stream_map_.find(id);
  if (it == stream_map_.end()) {
    return false;
  }
  return it->second->is_static();
}

size_t QuicSession::GetNumActiveStreams() const {
  if (!VersionHasIetfQuicFrames(transport_version())) {
    // Exclude locally_closed_streams when determine whether to keep connection
    // alive.
    return stream_id_manager_.num_open_incoming_streams() +
           stream_id_manager_.num_open_outgoing_streams() -
           locally_closed_streams_highest_offset_.size();
  }
  return stream_map_.size() - num_draining_streams_ - num_static_streams_;
}

void QuicSession::MarkConnectionLevelWriteBlocked(QuicStreamId id) {
  if (GetOrCreateStream(id) == nullptr) {
    QUIC_BUG << "Marking unknown stream " << id << " blocked.";
    QUIC_LOG_FIRST_N(ERROR, 2) << QuicStackTrace();
  }

  QUIC_DVLOG(1) << ENDPOINT << "Adding stream " << id
                << " to write-blocked list";

  write_blocked_streams_.AddStream(id);
}

bool QuicSession::HasDataToWrite() const {
  return write_blocked_streams_.HasWriteBlockedSpecialStream() ||
         write_blocked_streams_.HasWriteBlockedDataStreams() ||
         connection_->HasQueuedData() ||
         !streams_with_pending_retransmission_.empty() ||
         control_frame_manager_.WillingToWrite();
}

void QuicSession::OnAckNeedsRetransmittableFrame() {
  flow_controller_.SendWindowUpdate();
}

void QuicSession::SendPing() {
  control_frame_manager_.WritePing();
}

bool QuicSession::IsConnectionFlowControlBlocked() const {
  return flow_controller_.IsBlocked();
}

bool QuicSession::IsStreamFlowControlBlocked() {
  for (auto const& kv : stream_map_) {
    if (kv.second->flow_controller()->IsBlocked()) {
      return true;
    }
  }
  if (!QuicVersionUsesCryptoFrames(transport_version()) &&
      GetMutableCryptoStream()->flow_controller()->IsBlocked()) {
    return true;
  }
  return false;
}

size_t QuicSession::MaxAvailableBidirectionalStreams() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
  }
  return stream_id_manager_.MaxAvailableStreams();
}

size_t QuicSession::MaxAvailableUnidirectionalStreams() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
  }
  return stream_id_manager_.MaxAvailableStreams();
}

bool QuicSession::IsIncomingStream(QuicStreamId id) const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return !QuicUtils::IsOutgoingStreamId(version(), id, perspective_);
  }
  return stream_id_manager_.IsIncomingStream(id);
}

void QuicSession::OnStreamDoneWaitingForAcks(QuicStreamId id) {
  streams_waiting_for_acks_.erase(id);

  auto it = zombie_streams_.find(id);
  if (it == zombie_streams_.end()) {
    return;
  }

  closed_streams_.push_back(std::move(it->second));
  if (!closed_streams_clean_up_alarm_->IsSet()) {
    closed_streams_clean_up_alarm_->Set(connection_->clock()->ApproximateNow());
  }
  zombie_streams_.erase(it);
  // Do not retransmit data of a closed stream.
  streams_with_pending_retransmission_.erase(id);
}

void QuicSession::OnStreamWaitingForAcks(QuicStreamId id) {
  // Exclude crypto stream's status since it is counted in HasUnackedCryptoData.
  if (GetCryptoStream() != nullptr && id == GetCryptoStream()->id()) {
    return;
  }

  streams_waiting_for_acks_.insert(id);

  // The number of the streams waiting for acks should not be larger than the
  // number of streams.
  if (static_cast<size_t>(stream_map_.size() + zombie_streams_.size()) <
      streams_waiting_for_acks_.size()) {
    QUIC_BUG << "More streams are waiting for acks than the number of streams. "
             << "Sizes: streams: " << stream_map_.size()
             << ", zombie streams: " << zombie_streams_.size()
             << ", vs streams waiting for acks: "
             << streams_waiting_for_acks_.size();
  }
}

QuicStream* QuicSession::GetStream(QuicStreamId id) const {
  auto active_stream = stream_map_.find(id);
  if (active_stream != stream_map_.end()) {
    return active_stream->second.get();
  }
  auto zombie_stream = zombie_streams_.find(id);
  if (zombie_stream != zombie_streams_.end()) {
    return zombie_stream->second.get();
  }

  if (QuicUtils::IsCryptoStreamId(transport_version(), id)) {
    return const_cast<QuicCryptoStream*>(GetCryptoStream());
  }

  return nullptr;
}

bool QuicSession::OnFrameAcked(const QuicFrame& frame,
                               QuicTime::Delta ack_delay_time,
                               QuicTime receive_timestamp) {
  if (frame.type == MESSAGE_FRAME) {
    OnMessageAcked(frame.message_frame->message_id, receive_timestamp);
    return true;
  }
  if (frame.type == CRYPTO_FRAME) {
    return GetMutableCryptoStream()->OnCryptoFrameAcked(*frame.crypto_frame,
                                                        ack_delay_time);
  }
  if (frame.type != STREAM_FRAME) {
    return control_frame_manager_.OnControlFrameAcked(frame);
  }
  bool new_stream_data_acked = false;
  QuicStream* stream = GetStream(frame.stream_frame.stream_id);
  // Stream can already be reset when sent frame gets acked.
  if (stream != nullptr) {
    QuicByteCount newly_acked_length = 0;
    new_stream_data_acked = stream->OnStreamFrameAcked(
        frame.stream_frame.offset, frame.stream_frame.data_length,
        frame.stream_frame.fin, ack_delay_time, receive_timestamp,
        &newly_acked_length);
    if (!stream->HasPendingRetransmission()) {
      streams_with_pending_retransmission_.erase(stream->id());
    }
  }
  return new_stream_data_acked;
}

void QuicSession::OnStreamFrameRetransmitted(const QuicStreamFrame& frame) {
  QuicStream* stream = GetStream(frame.stream_id);
  if (stream == nullptr) {
    QUIC_BUG << "Stream: " << frame.stream_id << " is closed when " << frame
             << " is retransmitted.";
    connection()->CloseConnection(
        QUIC_INTERNAL_ERROR, "Attempt to retransmit frame of a closed stream",
        ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
    return;
  }
  stream->OnStreamFrameRetransmitted(frame.offset, frame.data_length,
                                     frame.fin);
}

void QuicSession::OnFrameLost(const QuicFrame& frame) {
  if (frame.type == MESSAGE_FRAME) {
    OnMessageLost(frame.message_frame->message_id);
    return;
  }
  if (frame.type == CRYPTO_FRAME) {
    GetMutableCryptoStream()->OnCryptoFrameLost(frame.crypto_frame);
    return;
  }
  if (frame.type != STREAM_FRAME) {
    control_frame_manager_.OnControlFrameLost(frame);
    return;
  }
  QuicStream* stream = GetStream(frame.stream_frame.stream_id);
  if (stream == nullptr) {
    return;
  }
  stream->OnStreamFrameLost(frame.stream_frame.offset,
                            frame.stream_frame.data_length,
                            frame.stream_frame.fin);
  if (stream->HasPendingRetransmission() &&
      !QuicContainsKey(streams_with_pending_retransmission_,
                       frame.stream_frame.stream_id)) {
    streams_with_pending_retransmission_.insert(
        std::make_pair(frame.stream_frame.stream_id, true));
  }
}

void QuicSession::RetransmitFrames(const QuicFrames& frames,
                                   TransmissionType type) {
  QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
  for (const QuicFrame& frame : frames) {
    if (frame.type == MESSAGE_FRAME) {
      // Do not retransmit MESSAGE frames.
      continue;
    }
    if (frame.type == CRYPTO_FRAME) {
      GetMutableCryptoStream()->RetransmitData(frame.crypto_frame, type);
      continue;
    }
    if (frame.type != STREAM_FRAME) {
      if (!control_frame_manager_.RetransmitControlFrame(frame, type)) {
        break;
      }
      continue;
    }
    QuicStream* stream = GetStream(frame.stream_frame.stream_id);
    if (stream != nullptr &&
        !stream->RetransmitStreamData(frame.stream_frame.offset,
                                      frame.stream_frame.data_length,
                                      frame.stream_frame.fin, type)) {
      break;
    }
  }
}

bool QuicSession::IsFrameOutstanding(const QuicFrame& frame) const {
  if (frame.type == MESSAGE_FRAME) {
    return false;
  }
  if (frame.type == CRYPTO_FRAME) {
    return GetCryptoStream()->IsFrameOutstanding(
        frame.crypto_frame->level, frame.crypto_frame->offset,
        frame.crypto_frame->data_length);
  }
  if (frame.type != STREAM_FRAME) {
    return control_frame_manager_.IsControlFrameOutstanding(frame);
  }
  QuicStream* stream = GetStream(frame.stream_frame.stream_id);
  return stream != nullptr &&
         stream->IsStreamFrameOutstanding(frame.stream_frame.offset,
                                          frame.stream_frame.data_length,
                                          frame.stream_frame.fin);
}

bool QuicSession::HasUnackedCryptoData() const {
  const QuicCryptoStream* crypto_stream = GetCryptoStream();
  return crypto_stream->IsWaitingForAcks() || crypto_stream->HasBufferedData();
}

bool QuicSession::HasUnackedStreamData() const {
  return !streams_waiting_for_acks_.empty();
}

HandshakeState QuicSession::GetHandshakeState() const {
  return GetCryptoStream()->GetHandshakeState();
}

WriteStreamDataResult QuicSession::WriteStreamData(QuicStreamId id,
                                                   QuicStreamOffset offset,
                                                   QuicByteCount data_length,
                                                   QuicDataWriter* writer) {
  QuicStream* stream = GetStream(id);
  if (stream == nullptr) {
    // This causes the connection to be closed because of failed to serialize
    // packet.
    QUIC_BUG << "Stream " << id << " does not exist when trying to write data."
             << " version:" << transport_version();
    return STREAM_MISSING;
  }
  if (stream->WriteStreamData(offset, data_length, writer)) {
    return WRITE_SUCCESS;
  }
  return WRITE_FAILED;
}

bool QuicSession::WriteCryptoData(EncryptionLevel level,
                                  QuicStreamOffset offset,
                                  QuicByteCount data_length,
                                  QuicDataWriter* writer) {
  return GetMutableCryptoStream()->WriteCryptoFrame(level, offset, data_length,
                                                    writer);
}

QuicUint128 QuicSession::GetStatelessResetToken() const {
  return QuicUtils::GenerateStatelessResetToken(connection_->connection_id());
}

bool QuicSession::CanWriteStreamData() const {
  // Don't write stream data if there are queued data packets.
  if (connection_->HasQueuedPackets()) {
    return false;
  }
  // Immediately write handshake data.
  if (HasPendingHandshake()) {
    return true;
  }
  return connection_->CanWrite(HAS_RETRANSMITTABLE_DATA);
}

bool QuicSession::RetransmitLostData() {
  QuicConnection::ScopedPacketFlusher retransmission_flusher(connection_);
  // Retransmit crypto data first.
  bool uses_crypto_frames = QuicVersionUsesCryptoFrames(transport_version());
  QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
  if (uses_crypto_frames && crypto_stream->HasPendingCryptoRetransmission()) {
    crypto_stream->WritePendingCryptoRetransmission();
  }
  // Retransmit crypto data in stream 1 frames (version < 47).
  if (!uses_crypto_frames &&
      QuicContainsKey(streams_with_pending_retransmission_,
                      QuicUtils::GetCryptoStreamId(transport_version()))) {
    // Retransmit crypto data first.
    QuicStream* crypto_stream =
        GetStream(QuicUtils::GetCryptoStreamId(transport_version()));
    crypto_stream->OnCanWrite();
    DCHECK(CheckStreamWriteBlocked(crypto_stream));
    if (crypto_stream->HasPendingRetransmission()) {
      // Connection is write blocked.
      return false;
    } else {
      streams_with_pending_retransmission_.erase(
          QuicUtils::GetCryptoStreamId(transport_version()));
    }
  }
  if (control_frame_manager_.HasPendingRetransmission()) {
    control_frame_manager_.OnCanWrite();
    if (control_frame_manager_.HasPendingRetransmission()) {
      return false;
    }
  }
  while (!streams_with_pending_retransmission_.empty()) {
    if (!CanWriteStreamData()) {
      break;
    }
    // Retransmit lost data on headers and data streams.
    const QuicStreamId id = streams_with_pending_retransmission_.begin()->first;
    QuicStream* stream = GetStream(id);
    if (stream != nullptr) {
      stream->OnCanWrite();
      DCHECK(CheckStreamWriteBlocked(stream));
      if (stream->HasPendingRetransmission()) {
        // Connection is write blocked.
        break;
      } else if (!streams_with_pending_retransmission_.empty() &&
                 streams_with_pending_retransmission_.begin()->first == id) {
        // Retransmit lost data may cause connection close. If this stream
        // has not yet sent fin, a RST_STREAM will be sent and it will be
        // removed from streams_with_pending_retransmission_.
        streams_with_pending_retransmission_.pop_front();
      }
    } else {
      QUIC_BUG << "Try to retransmit data of a closed stream";
      streams_with_pending_retransmission_.pop_front();
    }
  }

  return streams_with_pending_retransmission_.empty();
}

void QuicSession::NeuterUnencryptedData() {
  QuicCryptoStream* crypto_stream = GetMutableCryptoStream();
  crypto_stream->NeuterUnencryptedStreamData();
  if (!crypto_stream->HasPendingRetransmission() &&
      !QuicVersionUsesCryptoFrames(transport_version())) {
    streams_with_pending_retransmission_.erase(
        QuicUtils::GetCryptoStreamId(transport_version()));
  }
  connection_->NeuterUnencryptedPackets();
}

void QuicSession::SetTransmissionType(TransmissionType type) {
  connection_->SetTransmissionType(type);
}

MessageResult QuicSession::SendMessage(QuicMemSliceSpan message) {
  return SendMessage(message, /*flush=*/false);
}

MessageResult QuicSession::SendMessage(QuicMemSliceSpan message, bool flush) {
  DCHECK(connection_->connected())
      << ENDPOINT << "Try to write messages when connection is closed.";
  if (!IsEncryptionEstablished()) {
    return {MESSAGE_STATUS_ENCRYPTION_NOT_ESTABLISHED, 0};
  }
  MessageStatus result =
      connection_->SendMessage(last_message_id_ + 1, message, flush);
  if (result == MESSAGE_STATUS_SUCCESS) {
    return {result, ++last_message_id_};
  }
  return {result, 0};
}

void QuicSession::OnMessageAcked(QuicMessageId message_id,
                                 QuicTime /*receive_timestamp*/) {
  QUIC_DVLOG(1) << ENDPOINT << "message " << message_id << " gets acked.";
}

void QuicSession::OnMessageLost(QuicMessageId message_id) {
  QUIC_DVLOG(1) << ENDPOINT << "message " << message_id
                << " is considered lost";
}

void QuicSession::CleanUpClosedStreams() {
  closed_streams_.clear();
}

QuicPacketLength QuicSession::GetCurrentLargestMessagePayload() const {
  return connection_->GetCurrentLargestMessagePayload();
}

QuicPacketLength QuicSession::GetGuaranteedLargestMessagePayload() const {
  return connection_->GetGuaranteedLargestMessagePayload();
}

void QuicSession::SendStopSending(uint16_t code, QuicStreamId stream_id) {
  control_frame_manager_.WriteOrBufferStopSending(code, stream_id);
}

QuicStreamId QuicSession::next_outgoing_bidirectional_stream_id() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.next_outgoing_bidirectional_stream_id();
  }
  return stream_id_manager_.next_outgoing_stream_id();
}

QuicStreamId QuicSession::next_outgoing_unidirectional_stream_id() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.next_outgoing_unidirectional_stream_id();
  }
  return stream_id_manager_.next_outgoing_stream_id();
}

bool QuicSession::OnMaxStreamsFrame(const QuicMaxStreamsFrame& frame) {
  const bool allow_new_streams =
      frame.unidirectional
          ? v99_streamid_manager_.MaybeAllowNewOutgoingUnidirectionalStreams(
                frame.stream_count)
          : v99_streamid_manager_.MaybeAllowNewOutgoingBidirectionalStreams(
                frame.stream_count);
  if (allow_new_streams) {
    OnCanCreateNewOutgoingStream(frame.unidirectional);
  }

  return true;
}

bool QuicSession::OnStreamsBlockedFrame(const QuicStreamsBlockedFrame& frame) {
  std::string error_details;
  if (v99_streamid_manager_.OnStreamsBlockedFrame(frame, &error_details)) {
    return true;
  }
  connection_->CloseConnection(
      QUIC_STREAMS_BLOCKED_ERROR, error_details,
      ConnectionCloseBehavior::SEND_CONNECTION_CLOSE_PACKET);
  return false;
}

size_t QuicSession::max_open_incoming_bidirectional_streams() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetMaxAllowdIncomingBidirectionalStreams();
  }
  return stream_id_manager_.max_open_incoming_streams();
}

size_t QuicSession::max_open_incoming_unidirectional_streams() const {
  if (VersionHasIetfQuicFrames(transport_version())) {
    return v99_streamid_manager_.GetMaxAllowdIncomingUnidirectionalStreams();
  }
  return stream_id_manager_.max_open_incoming_streams();
}

std::vector<quiche::QuicheStringPiece>::const_iterator QuicSession::SelectAlpn(
    const std::vector<quiche::QuicheStringPiece>& alpns) const {
  const std::string alpn = AlpnForVersion(connection()->version());
  return std::find(alpns.cbegin(), alpns.cend(), alpn);
}

void QuicSession::OnAlpnSelected(quiche::QuicheStringPiece alpn) {
  QUIC_DLOG(INFO) << (perspective() == Perspective::IS_SERVER ? "Server: "
                                                              : "Client: ")
                  << "ALPN selected: " << alpn;
}

void QuicSession::NeuterCryptoDataOfEncryptionLevel(EncryptionLevel level) {
  GetMutableCryptoStream()->NeuterStreamDataOfEncryptionLevel(level);
}

#undef ENDPOINT  // undef for jumbo builds
}  // namespace quic
