|  | // Copyright (c) 2019 The Chromium Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style license that can be | 
|  | // found in the LICENSE file. | 
|  |  | 
|  | #include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h" | 
|  |  | 
|  | #include <cstdint> | 
|  | #include <utility> | 
|  |  | 
|  | #include "net/third_party/quiche/src/quic/core/quic_data_writer.h" | 
|  | #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" | 
|  | #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" | 
|  |  | 
|  | namespace quic { | 
|  |  | 
|  | QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer, | 
|  | uint64_t id, | 
|  | QuicBufferAllocator* allocator, | 
|  | Delegate* delegate) | 
|  | : multiplexer_(multiplexer), | 
|  | id_(id), | 
|  | encoded_length_(QuicDataWriter::GetVarInt62Len(id_)), | 
|  | allocator_(allocator), | 
|  | delegate_(delegate) {} | 
|  |  | 
|  | QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() { | 
|  | if (!session_) { | 
|  | QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_; | 
|  | return nullptr; | 
|  | } | 
|  | QuicMemSlice id_slice = EncodeChannelId(); | 
|  |  | 
|  | QuartcStream* stream = session_->CreateOutgoingBidirectionalStream(); | 
|  | QuicConsumedData consumed = | 
|  | stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false); | 
|  | DCHECK_EQ(consumed.bytes_consumed, encoded_length_); | 
|  | return stream; | 
|  | } | 
|  |  | 
|  | bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message, | 
|  | int64_t datagram_id) { | 
|  | if (!session_) { | 
|  | QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_ | 
|  | << "datagram size=" << message.total_length(); | 
|  | return false; | 
|  | } | 
|  | QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);  // Empty storage. | 
|  | storage.Append(EncodeChannelId()); | 
|  |  | 
|  | message.ConsumeAll( | 
|  | [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); }); | 
|  |  | 
|  | // Allocate a unique datagram id so that notifications can be routed back to | 
|  | // the right send channel. | 
|  | int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this); | 
|  | multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id; | 
|  |  | 
|  | return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id); | 
|  | } | 
|  |  | 
|  | void QuartcSendChannel::OnMessageSent(int64_t datagram_id) { | 
|  | // Map back to the caller-chosen |datagram_id|. | 
|  | datagram_id = multiplexer_to_user_datagram_ids_[datagram_id]; | 
|  | delegate_->OnMessageSent(datagram_id); | 
|  | } | 
|  |  | 
|  | void QuartcSendChannel::OnMessageAcked(int64_t datagram_id, | 
|  | QuicTime receive_timestamp) { | 
|  | // Map back to the caller-chosen |datagram_id|. | 
|  | auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); | 
|  | if (it == multiplexer_to_user_datagram_ids_.end()) { | 
|  | QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" | 
|  | << datagram_id; | 
|  | return; | 
|  | } | 
|  | delegate_->OnMessageAcked(it->second, receive_timestamp); | 
|  | multiplexer_to_user_datagram_ids_.erase(it); | 
|  | } | 
|  |  | 
|  | void QuartcSendChannel::OnMessageLost(int64_t datagram_id) { | 
|  | // Map back to the caller-chosen |datagram_id|. | 
|  | auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); | 
|  | if (it == multiplexer_to_user_datagram_ids_.end()) { | 
|  | QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" | 
|  | << datagram_id; | 
|  | return; | 
|  | } | 
|  | delegate_->OnMessageLost(it->second); | 
|  | multiplexer_to_user_datagram_ids_.erase(it); | 
|  | } | 
|  |  | 
|  | void QuartcSendChannel::OnSessionCreated(QuartcSession* session) { | 
|  | session_ = session; | 
|  | } | 
|  |  | 
|  | QuicMemSlice QuartcSendChannel::EncodeChannelId() { | 
|  | QuicMemSlice id_slice(allocator_, encoded_length_); | 
|  | QuicDataWriter writer(encoded_length_, const_cast<char*>(id_slice.data())); | 
|  | writer.WriteVarInt62(id_); | 
|  | return id_slice; | 
|  | } | 
|  |  | 
|  | QuartcMultiplexer::QuartcMultiplexer( | 
|  | QuicBufferAllocator* allocator, | 
|  | QuartcSessionEventDelegate* session_delegate, | 
|  | QuartcReceiveChannel* default_receive_channel) | 
|  | : allocator_(allocator), | 
|  | session_delegate_(session_delegate), | 
|  | default_receive_channel_(default_receive_channel) { | 
|  | CHECK_NE(session_delegate_, nullptr); | 
|  | CHECK_NE(default_receive_channel_, nullptr); | 
|  | } | 
|  |  | 
|  | QuartcSendChannel* QuartcMultiplexer::CreateSendChannel( | 
|  | uint64_t channel_id, | 
|  | QuartcSendChannel::Delegate* delegate) { | 
|  | send_channels_.push_back(std::make_unique<QuartcSendChannel>( | 
|  | this, channel_id, allocator_, delegate)); | 
|  | if (session_) { | 
|  | send_channels_.back()->OnSessionCreated(session_); | 
|  | } | 
|  | return send_channels_.back().get(); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id, | 
|  | QuartcReceiveChannel* channel) { | 
|  | if (channel == nullptr) { | 
|  | receive_channels_.erase(channel_id); | 
|  | return; | 
|  | } | 
|  | auto& registered_channel = receive_channels_[channel_id]; | 
|  | if (registered_channel) { | 
|  | QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id=" | 
|  | << channel_id; | 
|  | return; | 
|  | } | 
|  | registered_channel = channel; | 
|  | } | 
|  |  | 
|  | int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) { | 
|  | send_channels_by_datagram_id_[next_datagram_id_] = channel; | 
|  | return next_datagram_id_++; | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) { | 
|  | for (auto& channel : send_channels_) { | 
|  | channel->OnSessionCreated(session); | 
|  | } | 
|  | session_ = session; | 
|  | session_delegate_->OnSessionCreated(session); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnCryptoHandshakeComplete() { | 
|  | session_delegate_->OnCryptoHandshakeComplete(); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnConnectionWritable() { | 
|  | session_delegate_->OnConnectionWritable(); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) { | 
|  | stream->SetDelegate(this); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnCongestionControlChange( | 
|  | QuicBandwidth bandwidth_estimate, | 
|  | QuicBandwidth pacing_rate, | 
|  | QuicTime::Delta latest_rtt) { | 
|  | session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate, | 
|  | latest_rtt); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnConnectionClosed( | 
|  | const QuicConnectionCloseFrame& frame, | 
|  | ConnectionCloseSource source) { | 
|  | session_delegate_->OnConnectionClosed(frame, source); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnMessageReceived(QuicStringPiece message) { | 
|  | QuicDataReader reader(message); | 
|  | QuicVariableLengthIntegerLength channel_id_length = | 
|  | reader.PeekVarInt62Length(); | 
|  |  | 
|  | uint64_t channel_id; | 
|  | if (!reader.ReadVarInt62(&channel_id)) { | 
|  | QUIC_LOG(DFATAL) << "Received message without properly encoded channel id"; | 
|  | return; | 
|  | } | 
|  |  | 
|  | QuartcReceiveChannel* channel = default_receive_channel_; | 
|  | auto it = receive_channels_.find(channel_id); | 
|  | if (it != receive_channels_.end()) { | 
|  | channel = it->second; | 
|  | } | 
|  |  | 
|  | channel->OnMessageReceived(channel_id, message.substr(channel_id_length)); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) { | 
|  | auto it = send_channels_by_datagram_id_.find(datagram_id); | 
|  | if (it == send_channels_by_datagram_id_.end()) { | 
|  | return; | 
|  | } | 
|  | it->second->OnMessageSent(datagram_id); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id, | 
|  | QuicTime receive_timestamp) { | 
|  | auto it = send_channels_by_datagram_id_.find(datagram_id); | 
|  | if (it == send_channels_by_datagram_id_.end()) { | 
|  | return; | 
|  | } | 
|  | it->second->OnMessageAcked(datagram_id, receive_timestamp); | 
|  | send_channels_by_datagram_id_.erase(it); | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) { | 
|  | auto it = send_channels_by_datagram_id_.find(datagram_id); | 
|  | if (it == send_channels_by_datagram_id_.end()) { | 
|  | return; | 
|  | } | 
|  | it->second->OnMessageLost(datagram_id); | 
|  | send_channels_by_datagram_id_.erase(it); | 
|  | } | 
|  |  | 
|  | size_t QuartcMultiplexer::OnReceived(QuartcStream* stream, | 
|  | iovec* iov, | 
|  | size_t iov_length, | 
|  | bool /*fin*/) { | 
|  | if (iov == nullptr || iov_length <= 0) { | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len); | 
|  | QuicVariableLengthIntegerLength channel_id_length = | 
|  | reader.PeekVarInt62Length(); | 
|  |  | 
|  | uint64_t channel_id; | 
|  | if (reader.BytesRemaining() >= channel_id_length) { | 
|  | // Fast path, have enough data to read immediately. | 
|  | if (!reader.ReadVarInt62(&channel_id)) { | 
|  | return 0; | 
|  | } | 
|  | } else { | 
|  | // Slow path, need to coalesce multiple iovecs. | 
|  | std::string data; | 
|  | for (size_t i = 0; i < iov_length; ++i) { | 
|  | data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len); | 
|  | } | 
|  | QuicDataReader combined_reader(data); | 
|  | if (!combined_reader.ReadVarInt62(&channel_id)) { | 
|  | return 0; | 
|  | } | 
|  | } | 
|  |  | 
|  | QuartcReceiveChannel* channel = default_receive_channel_; | 
|  | auto it = receive_channels_.find(channel_id); | 
|  | if (it != receive_channels_.end()) { | 
|  | channel = it->second; | 
|  | } | 
|  | channel->OnIncomingStream(channel_id, stream); | 
|  | return channel_id_length; | 
|  | } | 
|  |  | 
|  | void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {} | 
|  |  | 
|  | void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {} | 
|  |  | 
|  | }  // namespace quic |