| // Copyright 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "quic/masque/masque_client_session.h" |
| #include "absl/algorithm/container.h" |
| #include "quic/core/http/spdy_utils.h" |
| #include "quic/core/quic_data_reader.h" |
| #include "quic/core/quic_utils.h" |
| #include "common/platform/api/quiche_text_utils.h" |
| |
| namespace quic { |
| |
| MasqueClientSession::MasqueClientSession( |
| MasqueMode masque_mode, |
| const QuicConfig& config, |
| const ParsedQuicVersionVector& supported_versions, |
| QuicConnection* connection, |
| const QuicServerId& server_id, |
| QuicCryptoClientConfig* crypto_config, |
| QuicClientPushPromiseIndex* push_promise_index, |
| Owner* owner) |
| : QuicSpdyClientSession(config, |
| supported_versions, |
| connection, |
| server_id, |
| crypto_config, |
| push_promise_index), |
| masque_mode_(masque_mode), |
| owner_(owner), |
| compression_engine_(this) {} |
| |
| void MasqueClientSession::OnMessageReceived(absl::string_view message) { |
| if (masque_mode_ == MasqueMode::kLegacy) { |
| QUIC_DVLOG(1) << "Received DATAGRAM frame of length " << message.length(); |
| QuicConnectionId client_connection_id, server_connection_id; |
| QuicSocketAddress target_server_address; |
| std::vector<char> packet; |
| bool version_present; |
| if (!compression_engine_.DecompressDatagram( |
| message, &client_connection_id, &server_connection_id, |
| &target_server_address, &packet, &version_present)) { |
| return; |
| } |
| |
| auto connection_id_registration = |
| client_connection_id_registrations_.find(client_connection_id); |
| if (connection_id_registration == |
| client_connection_id_registrations_.end()) { |
| QUIC_DLOG(ERROR) << "MasqueClientSession failed to dispatch " |
| << client_connection_id; |
| return; |
| } |
| EncapsulatedClientSession* encapsulated_client_session = |
| connection_id_registration->second; |
| encapsulated_client_session->ProcessPacket( |
| absl::string_view(packet.data(), packet.size()), target_server_address); |
| |
| QUIC_DVLOG(1) << "Sent " << packet.size() << " bytes to connection for " |
| << client_connection_id; |
| return; |
| } |
| QUICHE_DCHECK_EQ(masque_mode_, MasqueMode::kOpen); |
| QuicSpdySession::OnMessageReceived(message); |
| } |
| |
| void MasqueClientSession::OnMessageAcked(QuicMessageId message_id, |
| QuicTime /*receive_timestamp*/) { |
| QUIC_DVLOG(1) << "Received ack for DATAGRAM frame " << message_id; |
| } |
| |
| void MasqueClientSession::OnMessageLost(QuicMessageId message_id) { |
| QUIC_DVLOG(1) << "We believe DATAGRAM frame " << message_id << " was lost"; |
| } |
| |
| const MasqueClientSession::ConnectUdpClientState* |
| MasqueClientSession::GetOrCreateConnectUdpClientState( |
| const QuicSocketAddress& target_server_address, |
| EncapsulatedClientSession* encapsulated_client_session) { |
| for (const ConnectUdpClientState& client_state : connect_udp_client_states_) { |
| if (client_state.target_server_address() == target_server_address && |
| client_state.encapsulated_client_session() == |
| encapsulated_client_session) { |
| // Found existing CONNECT-UDP request. |
| return &client_state; |
| } |
| } |
| // No CONNECT-UDP request found, create a new one. |
| QuicSpdyClientStream* stream = CreateOutgoingBidirectionalStream(); |
| if (stream == nullptr) { |
| // Stream flow control limits prevented us from opening a new stream. |
| QUIC_DLOG(ERROR) << "Failed to open CONNECT-UDP stream"; |
| return nullptr; |
| } |
| |
| QuicDatagramFlowId flow_id = GetNextDatagramFlowId(); |
| |
| QUIC_DLOG(INFO) << "Sending CONNECT-UDP request for " << target_server_address |
| << " using flow ID " << flow_id << " on stream " |
| << stream->id(); |
| |
| // Send the request. |
| spdy::Http2HeaderBlock headers; |
| headers[":method"] = "CONNECT-UDP"; |
| headers[":scheme"] = "masque"; |
| headers[":path"] = "/"; |
| headers[":authority"] = target_server_address.ToString(); |
| SpdyUtils::AddDatagramFlowIdHeader(&headers, flow_id); |
| size_t bytes_sent = |
| stream->SendRequest(std::move(headers), /*body=*/"", /*fin=*/false); |
| if (bytes_sent == 0) { |
| QUIC_DLOG(ERROR) << "Failed to send CONNECT-UDP request"; |
| return nullptr; |
| } |
| |
| connect_udp_client_states_.push_back( |
| ConnectUdpClientState(stream, encapsulated_client_session, this, flow_id, |
| target_server_address)); |
| return &connect_udp_client_states_.back(); |
| } |
| |
| void MasqueClientSession::SendPacket( |
| QuicConnectionId client_connection_id, |
| QuicConnectionId server_connection_id, |
| absl::string_view packet, |
| const QuicSocketAddress& target_server_address, |
| EncapsulatedClientSession* encapsulated_client_session) { |
| if (masque_mode_ == MasqueMode::kLegacy) { |
| compression_engine_.CompressAndSendPacket(packet, client_connection_id, |
| server_connection_id, |
| target_server_address); |
| return; |
| } |
| const ConnectUdpClientState* connect_udp = GetOrCreateConnectUdpClientState( |
| target_server_address, encapsulated_client_session); |
| if (connect_udp == nullptr) { |
| QUIC_DLOG(ERROR) << "Failed to create CONNECT-UDP request"; |
| return; |
| } |
| |
| QuicDatagramFlowId flow_id = connect_udp->flow_id(); |
| MessageStatus message_status = |
| SendHttp3Datagram(connect_udp->flow_id(), packet); |
| |
| QUIC_DVLOG(1) << "Sent packet to " << target_server_address |
| << " compressed with flow ID " << flow_id |
| << " and got message status " |
| << MessageStatusToString(message_status); |
| } |
| |
| void MasqueClientSession::RegisterConnectionId( |
| QuicConnectionId client_connection_id, |
| EncapsulatedClientSession* encapsulated_client_session) { |
| QUIC_DLOG(INFO) << "Registering " << client_connection_id |
| << " to encapsulated client"; |
| QUICHE_DCHECK( |
| client_connection_id_registrations_.find(client_connection_id) == |
| client_connection_id_registrations_.end() || |
| client_connection_id_registrations_[client_connection_id] == |
| encapsulated_client_session); |
| client_connection_id_registrations_[client_connection_id] = |
| encapsulated_client_session; |
| } |
| |
| void MasqueClientSession::UnregisterConnectionId( |
| QuicConnectionId client_connection_id, |
| EncapsulatedClientSession* encapsulated_client_session) { |
| QUIC_DLOG(INFO) << "Unregistering " << client_connection_id; |
| if (masque_mode_ == MasqueMode::kLegacy) { |
| if (client_connection_id_registrations_.find(client_connection_id) != |
| client_connection_id_registrations_.end()) { |
| client_connection_id_registrations_.erase(client_connection_id); |
| owner_->UnregisterClientConnectionId(client_connection_id); |
| compression_engine_.UnregisterClientConnectionId(client_connection_id); |
| } |
| return; |
| } |
| |
| for (auto it = connect_udp_client_states_.begin(); |
| it != connect_udp_client_states_.end();) { |
| if (it->encapsulated_client_session() == encapsulated_client_session) { |
| QUIC_DLOG(INFO) << "Removing state for flow_id " << it->flow_id(); |
| auto* stream = it->stream(); |
| it = connect_udp_client_states_.erase(it); |
| if (!stream->write_side_closed()) { |
| stream->Reset(QUIC_STREAM_CANCELLED); |
| } |
| } else { |
| ++it; |
| } |
| } |
| } |
| |
| void MasqueClientSession::OnConnectionClosed( |
| const QuicConnectionCloseFrame& frame, |
| ConnectionCloseSource source) { |
| QuicSpdyClientSession::OnConnectionClosed(frame, source); |
| // Close all encapsulated sessions. |
| for (const auto& client_state : connect_udp_client_states_) { |
| client_state.encapsulated_client_session()->CloseConnection( |
| QUIC_CONNECTION_CANCELLED, "Underlying MASQUE connection was closed", |
| ConnectionCloseBehavior::SILENT_CLOSE); |
| } |
| } |
| |
| void MasqueClientSession::OnStreamClosed(QuicStreamId stream_id) { |
| if (QuicUtils::IsBidirectionalStreamId(stream_id, version()) && |
| QuicUtils::IsClientInitiatedStreamId(transport_version(), stream_id)) { |
| QuicSpdyClientStream* stream = |
| reinterpret_cast<QuicSpdyClientStream*>(GetActiveStream(stream_id)); |
| if (stream != nullptr) { |
| QUIC_DLOG(INFO) << "Stream " << stream_id |
| << " closed, got response headers:" |
| << stream->response_headers().DebugString(); |
| } |
| } |
| for (auto it = connect_udp_client_states_.begin(); |
| it != connect_udp_client_states_.end();) { |
| if (it->stream()->id() == stream_id) { |
| QUIC_DLOG(INFO) << "Stream " << stream_id |
| << " was closed, removing state for flow_id " |
| << it->flow_id(); |
| auto* encapsulated_client_session = it->encapsulated_client_session(); |
| it = connect_udp_client_states_.erase(it); |
| encapsulated_client_session->CloseConnection( |
| QUIC_CONNECTION_CANCELLED, |
| "Underlying MASQUE CONNECT-UDP stream was closed", |
| ConnectionCloseBehavior::SILENT_CLOSE); |
| } else { |
| ++it; |
| } |
| } |
| |
| QuicSpdyClientSession::OnStreamClosed(stream_id); |
| } |
| |
| MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState( |
| QuicSpdyClientStream* stream, |
| EncapsulatedClientSession* encapsulated_client_session, |
| MasqueClientSession* masque_session, |
| QuicDatagramFlowId flow_id, |
| const QuicSocketAddress& target_server_address) |
| : stream_(stream), |
| encapsulated_client_session_(encapsulated_client_session), |
| masque_session_(masque_session), |
| flow_id_(flow_id), |
| target_server_address_(target_server_address) { |
| QUICHE_DCHECK_NE(masque_session_, nullptr); |
| masque_session_->RegisterHttp3FlowId(this->flow_id(), this); |
| } |
| |
| MasqueClientSession::ConnectUdpClientState::~ConnectUdpClientState() { |
| if (flow_id_.has_value()) { |
| masque_session_->UnregisterHttp3FlowId(flow_id()); |
| } |
| } |
| |
| MasqueClientSession::ConnectUdpClientState::ConnectUdpClientState( |
| MasqueClientSession::ConnectUdpClientState&& other) { |
| *this = std::move(other); |
| } |
| |
| MasqueClientSession::ConnectUdpClientState& |
| MasqueClientSession::ConnectUdpClientState::operator=( |
| MasqueClientSession::ConnectUdpClientState&& other) { |
| stream_ = other.stream_; |
| encapsulated_client_session_ = other.encapsulated_client_session_; |
| masque_session_ = other.masque_session_; |
| flow_id_ = other.flow_id_; |
| target_server_address_ = other.target_server_address_; |
| other.flow_id_.reset(); |
| if (flow_id_.has_value()) { |
| masque_session_->UnregisterHttp3FlowId(flow_id()); |
| masque_session_->RegisterHttp3FlowId(flow_id(), this); |
| } |
| return *this; |
| } |
| |
| void MasqueClientSession::ConnectUdpClientState::OnHttp3Datagram( |
| QuicDatagramFlowId flow_id, |
| absl::string_view payload) { |
| QUICHE_DCHECK_EQ(flow_id, this->flow_id()); |
| encapsulated_client_session_->ProcessPacket(payload, target_server_address_); |
| QUIC_DVLOG(1) << "Sent " << payload.size() |
| << " bytes to connection for flow_id " << flow_id; |
| } |
| |
| } // namespace quic |