| // Copyright (c) 2019 The Chromium Authors. All rights reserved. | 
 | // Use of this source code is governed by a BSD-style license that can be | 
 | // found in the LICENSE file. | 
 |  | 
 | #include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h" | 
 |  | 
 | #include <cstdint> | 
 | #include <utility> | 
 |  | 
 | #include "net/third_party/quiche/src/quic/core/quic_data_writer.h" | 
 | #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h" | 
 | #include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h" | 
 |  | 
 | namespace quic { | 
 |  | 
 | QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer, | 
 |                                      uint64_t id, | 
 |                                      QuicBufferAllocator* allocator, | 
 |                                      Delegate* delegate) | 
 |     : multiplexer_(multiplexer), | 
 |       id_(id), | 
 |       encoded_length_(QuicDataWriter::GetVarInt62Len(id_)), | 
 |       allocator_(allocator), | 
 |       delegate_(delegate) {} | 
 |  | 
 | QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() { | 
 |   if (!session_) { | 
 |     QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_; | 
 |     return nullptr; | 
 |   } | 
 |   QuicMemSlice id_slice = EncodeChannelId(); | 
 |  | 
 |   QuartcStream* stream = session_->CreateOutgoingBidirectionalStream(); | 
 |   QuicConsumedData consumed = | 
 |       stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false); | 
 |   DCHECK_EQ(consumed.bytes_consumed, encoded_length_); | 
 |   return stream; | 
 | } | 
 |  | 
 | bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message, | 
 |                                            int64_t datagram_id) { | 
 |   if (!session_) { | 
 |     QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_ | 
 |                      << "datagram size=" << message.total_length(); | 
 |     return false; | 
 |   } | 
 |   QuicMemSliceStorage storage(nullptr, 0, nullptr, 0);  // Empty storage. | 
 |   storage.Append(EncodeChannelId()); | 
 |  | 
 |   message.ConsumeAll( | 
 |       [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); }); | 
 |  | 
 |   // Allocate a unique datagram id so that notifications can be routed back to | 
 |   // the right send channel. | 
 |   int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this); | 
 |   multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id; | 
 |  | 
 |   return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id); | 
 | } | 
 |  | 
 | void QuartcSendChannel::OnMessageSent(int64_t datagram_id) { | 
 |   // Map back to the caller-chosen |datagram_id|. | 
 |   datagram_id = multiplexer_to_user_datagram_ids_[datagram_id]; | 
 |   delegate_->OnMessageSent(datagram_id); | 
 | } | 
 |  | 
 | void QuartcSendChannel::OnMessageAcked(int64_t datagram_id, | 
 |                                        QuicTime receive_timestamp) { | 
 |   // Map back to the caller-chosen |datagram_id|. | 
 |   auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); | 
 |   if (it == multiplexer_to_user_datagram_ids_.end()) { | 
 |     QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" | 
 |                      << datagram_id; | 
 |     return; | 
 |   } | 
 |   delegate_->OnMessageAcked(it->second, receive_timestamp); | 
 |   multiplexer_to_user_datagram_ids_.erase(it); | 
 | } | 
 |  | 
 | void QuartcSendChannel::OnMessageLost(int64_t datagram_id) { | 
 |   // Map back to the caller-chosen |datagram_id|. | 
 |   auto it = multiplexer_to_user_datagram_ids_.find(datagram_id); | 
 |   if (it == multiplexer_to_user_datagram_ids_.end()) { | 
 |     QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id=" | 
 |                      << datagram_id; | 
 |     return; | 
 |   } | 
 |   delegate_->OnMessageLost(it->second); | 
 |   multiplexer_to_user_datagram_ids_.erase(it); | 
 | } | 
 |  | 
 | void QuartcSendChannel::OnSessionCreated(QuartcSession* session) { | 
 |   session_ = session; | 
 | } | 
 |  | 
 | QuicMemSlice QuartcSendChannel::EncodeChannelId() { | 
 |   QuicMemSlice id_slice(allocator_, encoded_length_); | 
 |   QuicDataWriter writer(encoded_length_, const_cast<char*>(id_slice.data())); | 
 |   writer.WriteVarInt62(id_); | 
 |   return id_slice; | 
 | } | 
 |  | 
 | QuartcMultiplexer::QuartcMultiplexer( | 
 |     QuicBufferAllocator* allocator, | 
 |     QuartcSessionEventDelegate* session_delegate, | 
 |     QuartcReceiveChannel* default_receive_channel) | 
 |     : allocator_(allocator), | 
 |       session_delegate_(session_delegate), | 
 |       default_receive_channel_(default_receive_channel) { | 
 |   CHECK_NE(session_delegate_, nullptr); | 
 |   CHECK_NE(default_receive_channel_, nullptr); | 
 | } | 
 |  | 
 | QuartcSendChannel* QuartcMultiplexer::CreateSendChannel( | 
 |     uint64_t channel_id, | 
 |     QuartcSendChannel::Delegate* delegate) { | 
 |   send_channels_.push_back(std::make_unique<QuartcSendChannel>( | 
 |       this, channel_id, allocator_, delegate)); | 
 |   if (session_) { | 
 |     send_channels_.back()->OnSessionCreated(session_); | 
 |   } | 
 |   return send_channels_.back().get(); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id, | 
 |                                                QuartcReceiveChannel* channel) { | 
 |   if (channel == nullptr) { | 
 |     receive_channels_.erase(channel_id); | 
 |     return; | 
 |   } | 
 |   auto& registered_channel = receive_channels_[channel_id]; | 
 |   if (registered_channel) { | 
 |     QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id=" | 
 |                      << channel_id; | 
 |     return; | 
 |   } | 
 |   registered_channel = channel; | 
 | } | 
 |  | 
 | int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) { | 
 |   send_channels_by_datagram_id_[next_datagram_id_] = channel; | 
 |   return next_datagram_id_++; | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) { | 
 |   for (auto& channel : send_channels_) { | 
 |     channel->OnSessionCreated(session); | 
 |   } | 
 |   session_ = session; | 
 |   session_delegate_->OnSessionCreated(session); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnCryptoHandshakeComplete() { | 
 |   session_delegate_->OnCryptoHandshakeComplete(); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnConnectionWritable() { | 
 |   session_delegate_->OnConnectionWritable(); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) { | 
 |   stream->SetDelegate(this); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnCongestionControlChange( | 
 |     QuicBandwidth bandwidth_estimate, | 
 |     QuicBandwidth pacing_rate, | 
 |     QuicTime::Delta latest_rtt) { | 
 |   session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate, | 
 |                                                latest_rtt); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnConnectionClosed( | 
 |     const QuicConnectionCloseFrame& frame, | 
 |     ConnectionCloseSource source) { | 
 |   session_delegate_->OnConnectionClosed(frame, source); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnMessageReceived(QuicStringPiece message) { | 
 |   QuicDataReader reader(message); | 
 |   QuicVariableLengthIntegerLength channel_id_length = | 
 |       reader.PeekVarInt62Length(); | 
 |  | 
 |   uint64_t channel_id; | 
 |   if (!reader.ReadVarInt62(&channel_id)) { | 
 |     QUIC_LOG(DFATAL) << "Received message without properly encoded channel id"; | 
 |     return; | 
 |   } | 
 |  | 
 |   QuartcReceiveChannel* channel = default_receive_channel_; | 
 |   auto it = receive_channels_.find(channel_id); | 
 |   if (it != receive_channels_.end()) { | 
 |     channel = it->second; | 
 |   } | 
 |  | 
 |   channel->OnMessageReceived(channel_id, message.substr(channel_id_length)); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) { | 
 |   auto it = send_channels_by_datagram_id_.find(datagram_id); | 
 |   if (it == send_channels_by_datagram_id_.end()) { | 
 |     return; | 
 |   } | 
 |   it->second->OnMessageSent(datagram_id); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id, | 
 |                                        QuicTime receive_timestamp) { | 
 |   auto it = send_channels_by_datagram_id_.find(datagram_id); | 
 |   if (it == send_channels_by_datagram_id_.end()) { | 
 |     return; | 
 |   } | 
 |   it->second->OnMessageAcked(datagram_id, receive_timestamp); | 
 |   send_channels_by_datagram_id_.erase(it); | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) { | 
 |   auto it = send_channels_by_datagram_id_.find(datagram_id); | 
 |   if (it == send_channels_by_datagram_id_.end()) { | 
 |     return; | 
 |   } | 
 |   it->second->OnMessageLost(datagram_id); | 
 |   send_channels_by_datagram_id_.erase(it); | 
 | } | 
 |  | 
 | size_t QuartcMultiplexer::OnReceived(QuartcStream* stream, | 
 |                                      iovec* iov, | 
 |                                      size_t iov_length, | 
 |                                      bool /*fin*/) { | 
 |   if (iov == nullptr || iov_length <= 0) { | 
 |     return 0; | 
 |   } | 
 |  | 
 |   QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len); | 
 |   QuicVariableLengthIntegerLength channel_id_length = | 
 |       reader.PeekVarInt62Length(); | 
 |  | 
 |   uint64_t channel_id; | 
 |   if (reader.BytesRemaining() >= channel_id_length) { | 
 |     // Fast path, have enough data to read immediately. | 
 |     if (!reader.ReadVarInt62(&channel_id)) { | 
 |       return 0; | 
 |     } | 
 |   } else { | 
 |     // Slow path, need to coalesce multiple iovecs. | 
 |     std::string data; | 
 |     for (size_t i = 0; i < iov_length; ++i) { | 
 |       data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len); | 
 |     } | 
 |     QuicDataReader combined_reader(data); | 
 |     if (!combined_reader.ReadVarInt62(&channel_id)) { | 
 |       return 0; | 
 |     } | 
 |   } | 
 |  | 
 |   QuartcReceiveChannel* channel = default_receive_channel_; | 
 |   auto it = receive_channels_.find(channel_id); | 
 |   if (it != receive_channels_.end()) { | 
 |     channel = it->second; | 
 |   } | 
 |   channel->OnIncomingStream(channel_id, stream); | 
 |   return channel_id_length; | 
 | } | 
 |  | 
 | void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {} | 
 |  | 
 | void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {} | 
 |  | 
 | }  // namespace quic |