// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "quic/masque/masque_client_session.h"
#include "absl/algorithm/container.h"
#include "quic/core/http/spdy_utils.h"
#include "quic/core/quic_data_reader.h"
#include "quic/core/quic_utils.h"
#include "common/platform/api/quiche_text_utils.h"

namespace quic {

MasqueClientSession::MasqueClientSession(
    MasqueMode masque_mode,
    const QuicConfig& config,
    const ParsedQuicVersionVector& supported_versions,
    QuicConnection* connection,
    const QuicServerId& server_id,
    QuicCryptoClientConfig* crypto_config,
    QuicClientPushPromiseIndex* push_promise_index,
    Owner* owner)
    : QuicSpdyClientSession(config,
                            supported_versions,
                            connection,
                            server_id,
                            crypto_config,
                            push_promise_index),
      masque_mode_(masque_mode),
      owner_(owner),
      compression_engine_(this) {}

void MasqueClientSession::OnMessageReceived(absl::string_view message) {
  if (masque_mode_ == MasqueMode::kLegacy) {
    QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
    QuicConnectionId client_connection_id, server_connection_id;
    QuicSocketAddress target_server_address;
    std::vector<char> packet;
    bool version_present;
    if (!compression_engine_.DecompressDatagram(
            message, &client_connection_id, &server_connection_id,
            &target_server_address, &packet, &version_present)) {
      return;
    }

    auto connection_id_registration =
        client_connection_id_registrations_.find(client_connection_id);
    if (connection_id_registration ==
        client_connection_id_registrations_.end()) {
      QUIC_DLOG(ERROR) << "MasqueClientSession failed to dispatch "
                       << client_connection_id;
      return;
    }
    EncapsulatedClientSession* encapsulated_client_session =
        connection_id_registration->second;
    encapsulated_client_session->ProcessPacket(
        absl::string_view(packet.data(), packet.size()), target_server_address);

    QUIC_DVLOG(1) << "Sent " << packet.size() << " bytes to connection for "
                  << client_connection_id;
    return;
  }
  QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen);
  QuicSpdySession::OnMessageReceived(message);
}

void MasqueClientSession::OnMessageAcked(QuicMessageId message_id,
                                         QuicTime /*receive_timestamp*/) {
  QUIC_DVLOG(1) << "Received ack for DATAGRAM frame " << message_id;
}

void MasqueClientSession::OnMessageLost(QuicMessageId message_id) {
  QUIC_DVLOG(1) << "We believe DATAGRAM frame " << message_id << " was lost";
}

const MasqueClientSession::ConnectUdpClientState*
MasqueClientSession::GetOrCreateConnectUdpClientState(
    const QuicSocketAddress& target_server_address,
    EncapsulatedClientSession* encapsulated_client_session) {
  for (const ConnectUdpClientState& client_state : connect_udp_client_states_) {
    if (client_state.target_server_address() == target_server_address &&
        client_state.encapsulated_client_session() ==
            encapsulated_client_session) {
      // Found existing CONNECT-UDP request.
      return &client_state;
    }
  }
  // No CONNECT-UDP request found, create a new one.
  QuicSpdyClientStream* stream = CreateOutgoingBidirectionalStream();
  if (stream == nullptr) {
    // Stream flow control limits prevented us from opening a new stream.
    QUIC_DLOG(ERROR) << "Failed to open CONNECT-UDP stream";
    return nullptr;
  }

  QuicDatagramFlowId flow_id = GetNextDatagramFlowId();

  QUIC_DLOG(INFO) << "Sending CONNECT-UDP request for " << target_server_address
                  << " using flow ID " << flow_id << " on stream "
                  << stream->id();

  // Send the request.
  spdy::Http2HeaderBlock headers;
  headers[":method"] = "CONNECT-UDP";
  headers[":scheme"] = "masque";
  headers[":path"] = "/";
  headers[":authority"] = target_server_address.ToString();
  SpdyUtils::AddDatagramFlowIdHeader(&headers, flow_id);
  size_t bytes_sent =
      stream->SendRequest(std::move(headers), /*body=*/"", /*fin=*/false);
  if (bytes_sent == 0) {
    QUIC_DLOG(ERROR) << "Failed to send CONNECT-UDP request";
    return nullptr;
  }

  connect_udp_client_states_.push_back(
      ConnectUdpClientState(stream, encapsulated_client_session, this, flow_id,
                            target_server_address));
  return &connect_udp_client_states_.back();
}

void MasqueClientSession::SendPacket(
    QuicConnectionId client_connection_id,
    QuicConnectionId server_connection_id,
    absl::string_view packet,
    const QuicSocketAddress& target_server_address,
    EncapsulatedClientSession* encapsulated_client_session) {
  if (masque_mode_ == MasqueMode::kLegacy) {
    compression_engine_.CompressAndSendPacket(packet, client_connection_id,
                                              server_connection_id,
                                              target_server_address);
    return;
  }
  const ConnectUdpClientState* connect_udp = GetOrCreateConnectUdpClientState(
      target_server_address, encapsulated_client_session);
  if (connect_udp == nullptr) {
    QUIC_DLOG(ERROR) << "Failed to create CONNECT-UDP request";
    return;
  }

  QuicDatagramFlowId flow_id = connect_udp->flow_id();
  MessageStatus message_status =
      SendHttp3Datagram(connect_udp->flow_id(), packet);

  QUIC_DVLOG(1) << "Sent packet to " << target_server_address
                << " compressed with flow ID " << flow_id
                << " and got message status "
                << MessageStatusToString(message_status);
}

void MasqueClientSession::RegisterConnectionId(
    QuicConnectionId client_connection_id,
    EncapsulatedClientSession* encapsulated_client_session) {
  QUIC_DLOG(INFO) << "Registering " << client_connection_id
                  << " to encapsulated client";
  QUICHE_DCHECK(
      client_connection_id_registrations_.find(client_connection_id) ==
          client_connection_id_registrations_.end() ||
      client_connection_id_registrations_[client_connection_id] ==
          encapsulated_client_session);
  client_connection_id_registrations_[client_connection_id] =
      encapsulated_client_session;
}

void MasqueClientSession::UnregisterConnectionId(
    QuicConnectionId client_connection_id,
    EncapsulatedClientSession* encapsulated_client_session) {
  QUIC_DLOG(INFO) << "Unregistering " << client_connection_id;
  if (masque_mode_ == MasqueMode::kLegacy) {
    if (client_connection_id_registrations_.find(client_connection_id) !=
        client_connection_id_registrations_.end()) {
      client_connection_id_registrations_.erase(client_connection_id);
      owner_->UnregisterClientConnectionId(client_connection_id);
      compression_engine_.UnregisterClientConnectionId(client_connection_id);
    }
    return;
  }

  for (auto it = connect_udp_client_states_.begin();
       it != connect_udp_client_states_.end();) {
    if (it->encapsulated_client_session() == encapsulated_client_session) {
      QUIC_DLOG(INFO) << "Removing state for flow_id " << it->flow_id();
      auto* stream = it->stream();
      it = connect_udp_client_states_.erase(it);
      if (!stream->write_side_closed()) {
        stream->Reset(QUIC_STREAM_CANCELLED);
      }
    } else {
      ++it;
    }
  }
}

void MasqueClientSession::OnConnectionClosed(
    const QuicConnectionCloseFrame& frame,
    ConnectionCloseSource source) {
  QuicSpdyClientSession::OnConnectionClosed(frame, source);
  // Close all encapsulated sessions.
  for (const auto& client_state : connect_udp_client_states_) {
    client_state.encapsulated_client_session()->CloseConnection(
        QUIC_CONNECTION_CANCELLED, "Underlying MASQUE connection was closed",
        ConnectionCloseBehavior::SILENT_CLOSE);
  }
}

void MasqueClientSession::OnStreamClosed(QuicStreamId stream_id) {
  if (QuicUtils::IsBidirectionalStreamId(stream_id, version()) &&
      QuicUtils::IsClientInitiatedStreamId(transport_version(), stream_id)) {
    QuicSpdyClientStream* stream =
        reinterpret_cast<QuicSpdyClientStream*>(GetActiveStream(stream_id));
    if (stream != nullptr) {
      QUIC_DLOG(INFO) << "Stream " << stream_id
                      << " closed, got response headers:"
                      << stream->response_headers().DebugString();
    }
  }
  for (auto it = connect_udp_client_states_.begin();
       it != connect_udp_client_states_.end();) {
    if (it->stream()->id() == stream_id) {
      QUIC_DLOG(INFO) << "Stream " << stream_id
                      << " was closed, removing state for flow_id "
                      << it->flow_id();
      auto* encapsulated_client_session = it->encapsulated_client_session();
      it = connect_udp_client_states_.erase(it);
      encapsulated_client_session->CloseConnection(
          QUIC_CONNECTION_CANCELLED,
          "Underlying MASQUE CONNECT-UDP stream was closed",
          ConnectionCloseBehavior::SILENT_CLOSE);
    } else {
      ++it;
    }
  }

  QuicSpdyClientSession::OnStreamClosed(stream_id);
}

MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState(
    QuicSpdyClientStream* stream,
    EncapsulatedClientSession* encapsulated_client_session,
    MasqueClientSession* masque_session,
    QuicDatagramFlowId flow_id,
    const QuicSocketAddress& target_server_address)
    : stream_(stream),
      encapsulated_client_session_(encapsulated_client_session),
      masque_session_(masque_session),
      flow_id_(flow_id),
      target_server_address_(target_server_address) {
  QUICHE_DCHECK_NE(masque_session_, nullptr);
  masque_session_->RegisterHttp3FlowId(this->flow_id(), this);
}

MasqueClientSession::ConnectUdpClientState::~ConnectUdpClientState() {
  if (flow_id_.has_value()) {
    masque_session_->UnregisterHttp3FlowId(flow_id());
  }
}

MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState(
    MasqueClientSession::ConnectUdpClientState&& other) {
  *this = std::move(other);
}

MasqueClientSession::ConnectUdpClientState&
MasqueClientSession::ConnectUdpClientState::operator=(
    MasqueClientSession::ConnectUdpClientState&& other) {
  stream_ = other.stream_;
  encapsulated_client_session_ = other.encapsulated_client_session_;
  masque_session_ = other.masque_session_;
  flow_id_ = other.flow_id_;
  target_server_address_ = other.target_server_address_;
  other.flow_id_.reset();
  if (flow_id_.has_value()) {
    masque_session_->UnregisterHttp3FlowId(flow_id());
    masque_session_->RegisterHttp3FlowId(flow_id(), this);
  }
  return *this;
}

void MasqueClientSession::ConnectUdpClientState::OnHttp3Datagram(
    QuicDatagramFlowId flow_id,
    absl::string_view payload) {
  QUICHE_DCHECK_EQ(flow_id, this->flow_id());
  encapsulated_client_session_->ProcessPacket(payload, target_server_address_);
  QUIC_DVLOG(1) << "Sent " << payload.size()
                << " bytes to connection for flow_id " << flow_id;
}

}  // namespace quic
