blob: 6c7859e51045033cce08d2820d6c86807e303f83 [file] [log] [blame]
// Copyright 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "quic/masque/masque_client_session.h"
#include "absl/algorithm/container.h"
#include "quic/core/quic_data_reader.h"
#include "common/platform/api/quiche_text_utils.h"
namespace quic {
MasqueClientSession::MasqueClientSession(
MasqueMode masque_mode,
const QuicConfig& config,
const ParsedQuicVersionVector& supported_versions,
QuicConnection* connection,
const QuicServerId& server_id,
QuicCryptoClientConfig* crypto_config,
QuicClientPushPromiseIndex* push_promise_index,
Owner* owner)
: QuicSpdyClientSession(config,
supported_versions,
connection,
server_id,
crypto_config,
push_promise_index),
masque_mode_(masque_mode),
owner_(owner),
compression_engine_(this) {}
void MasqueClientSession::OnMessageReceived(absl::string_view message) {
QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length();
if (masque_mode_ == MasqueMode::kLegacy) {
QuicConnectionId client_connection_id, server_connection_id;
QuicSocketAddress target_server_address;
std::vector<char> packet;
bool version_present;
if (!compression_engine_.DecompressDatagram(
message, &client_connection_id, &server_connection_id,
&target_server_address, &packet, &version_present)) {
return;
}
auto connection_id_registration =
client_connection_id_registrations_.find(client_connection_id);
if (connection_id_registration ==
client_connection_id_registrations_.end()) {
QUIC_DLOG(ERROR) << "MasqueClientSession failed to dispatch "
<< client_connection_id;
return;
}
EncapsulatedClientSession* encapsulated_client_session =
connection_id_registration->second;
encapsulated_client_session->ProcessPacket(
absl::string_view(packet.data(), packet.size()), target_server_address);
QUIC_DVLOG(1) << "Sent " << packet.size() << " bytes to connection for "
<< client_connection_id;
return;
}
QuicDataReader reader(message);
QuicDatagramFlowId flow_id;
if (!reader.ReadVarInt62(&flow_id)) {
QUIC_DLOG(ERROR) << "Failed to parse flow_id";
return;
}
auto it =
absl::c_find_if(connect_udp_client_states_,
[flow_id](const ConnectUdpClientState& connect_udp) {
return connect_udp.flow_id() == flow_id;
});
if (it == connect_udp_client_states_.end()) {
QUIC_DLOG(ERROR) << "Received unknown flow_id " << flow_id;
return;
}
EncapsulatedClientSession* encapsulated_client_session =
it->encapsulated_client_session();
QuicSocketAddress target_server_address = it->target_server_address();
QUICHE_DCHECK_NE(encapsulated_client_session, nullptr);
QUICHE_DCHECK(target_server_address.IsInitialized());
absl::string_view packet = reader.ReadRemainingPayload();
encapsulated_client_session->ProcessPacket(packet, target_server_address);
QUIC_DVLOG(1) << "Sent " << packet.size()
<< " bytes to connection for flow_id " << flow_id;
}
void MasqueClientSession::OnMessageAcked(QuicMessageId message_id,
QuicTime /*receive_timestamp*/) {
QUIC_DVLOG(1) << "Received ack for DATAGRAM frame " << message_id;
}
void MasqueClientSession::OnMessageLost(QuicMessageId message_id) {
QUIC_DVLOG(1) << "We believe DATAGRAM frame " << message_id << " was lost";
}
const MasqueClientSession::ConnectUdpClientState*
MasqueClientSession::GetOrCreateConnectUdpClientState(
const QuicSocketAddress& target_server_address,
EncapsulatedClientSession* encapsulated_client_session) {
for (const ConnectUdpClientState& client_state : connect_udp_client_states_) {
if (client_state.target_server_address() == target_server_address &&
client_state.encapsulated_client_session() ==
encapsulated_client_session) {
// Found existing CONNECT-UDP request.
return &client_state;
}
}
// No CONNECT-UDP request found, create a new one.
QuicSpdyClientStream* stream = CreateOutgoingBidirectionalStream();
if (stream == nullptr) {
// Stream flow control limits prevented us from opening a new stream.
QUIC_DLOG(ERROR) << "Failed to open CONNECT-UDP stream";
return nullptr;
}
QuicDatagramFlowId flow_id = GetNextDatagramFlowId();
// Send the request.
spdy::Http2HeaderBlock headers;
headers[":method"] = "CONNECT-UDP";
headers[":scheme"] = "masque";
headers[":path"] = "/";
headers[":authority"] = target_server_address.ToString();
headers["datagram-flow-id"] = absl::StrCat(flow_id);
size_t bytes_sent =
stream->SendRequest(std::move(headers), /*body=*/"", /*fin=*/false);
if (bytes_sent == 0) {
QUIC_DLOG(ERROR) << "Failed to send CONNECT-UDP request";
return nullptr;
}
connect_udp_client_states_.push_back(ConnectUdpClientState(
stream, encapsulated_client_session, flow_id, target_server_address));
return &connect_udp_client_states_.back();
}
void MasqueClientSession::SendPacket(
QuicConnectionId client_connection_id,
QuicConnectionId server_connection_id,
absl::string_view packet,
const QuicSocketAddress& target_server_address,
EncapsulatedClientSession* encapsulated_client_session) {
if (masque_mode_ == MasqueMode::kLegacy) {
compression_engine_.CompressAndSendPacket(packet, client_connection_id,
server_connection_id,
target_server_address);
return;
}
const ConnectUdpClientState* connect_udp = GetOrCreateConnectUdpClientState(
target_server_address, encapsulated_client_session);
if (connect_udp == nullptr) {
QUIC_DLOG(ERROR) << "Failed to create CONNECT-UDP request";
return;
}
QuicDatagramFlowId flow_id = connect_udp->flow_id();
size_t slice_length =
QuicDataWriter::GetVarInt62Len(flow_id) + packet.length();
QuicUniqueBufferPtr buffer = MakeUniqueBuffer(
connection()->helper()->GetStreamSendBufferAllocator(), slice_length);
QuicDataWriter writer(slice_length, buffer.get());
if (!writer.WriteVarInt62(flow_id)) {
QUIC_BUG << "Failed to write flow_id";
return;
}
if (!writer.WriteBytes(packet.data(), packet.length())) {
QUIC_BUG << "Failed to write packet";
return;
}
QuicMemSlice slice(std::move(buffer), slice_length);
MessageResult message_result = SendMessage(QuicMemSliceSpan(&slice));
QUIC_DVLOG(1) << "Sent packet to " << target_server_address
<< " compressed with flow ID " << flow_id
<< " and got message result " << message_result;
}
void MasqueClientSession::RegisterConnectionId(
QuicConnectionId client_connection_id,
EncapsulatedClientSession* encapsulated_client_session) {
QUIC_DLOG(INFO) << "Registering " << client_connection_id
<< " to encapsulated client";
QUICHE_DCHECK(
client_connection_id_registrations_.find(client_connection_id) ==
client_connection_id_registrations_.end() ||
client_connection_id_registrations_[client_connection_id] ==
encapsulated_client_session);
client_connection_id_registrations_[client_connection_id] =
encapsulated_client_session;
}
void MasqueClientSession::UnregisterConnectionId(
QuicConnectionId client_connection_id,
EncapsulatedClientSession* encapsulated_client_session) {
QUIC_DLOG(INFO) << "Unregistering " << client_connection_id;
if (masque_mode_ == MasqueMode::kLegacy) {
if (client_connection_id_registrations_.find(client_connection_id) !=
client_connection_id_registrations_.end()) {
client_connection_id_registrations_.erase(client_connection_id);
owner_->UnregisterClientConnectionId(client_connection_id);
compression_engine_.UnregisterClientConnectionId(client_connection_id);
}
return;
}
for (auto it = connect_udp_client_states_.begin();
it != connect_udp_client_states_.end();) {
if (it->encapsulated_client_session() == encapsulated_client_session) {
QUIC_DLOG(INFO) << "Removing state for flow_id " << it->flow_id();
auto* stream = it->stream();
it = connect_udp_client_states_.erase(it);
if (!stream->write_side_closed()) {
stream->Reset(QUIC_STREAM_CANCELLED);
}
} else {
++it;
}
}
}
void MasqueClientSession::OnConnectionClosed(
const QuicConnectionCloseFrame& frame,
ConnectionCloseSource source) {
QuicSpdyClientSession::OnConnectionClosed(frame, source);
// Close all encapsulated sessions.
for (auto client_state : connect_udp_client_states_) {
client_state.encapsulated_client_session()->CloseConnection(
QUIC_CONNECTION_CANCELLED, "Underlying MASQUE connection was closed",
ConnectionCloseBehavior::SILENT_CLOSE);
}
}
void MasqueClientSession::OnStreamClosed(QuicStreamId stream_id) {
for (auto it = connect_udp_client_states_.begin();
it != connect_udp_client_states_.end();) {
if (it->stream()->id() == stream_id) {
QUIC_DLOG(INFO) << "Stream " << stream_id
<< " was closed, removing state for flow_id "
<< it->flow_id();
auto* encapsulated_client_session = it->encapsulated_client_session();
it = connect_udp_client_states_.erase(it);
encapsulated_client_session->CloseConnection(
QUIC_CONNECTION_CANCELLED,
"Underlying MASQUE CONNECT-UDP stream was closed",
ConnectionCloseBehavior::SILENT_CLOSE);
} else {
++it;
}
}
QuicSpdyClientSession::OnStreamClosed(stream_id);
}
} // namespace quic