Create a QuartcMultiplexer which separates streams and datagrams into channels.
QuartcMultiplexer effectively reorganizes the calls and callbacks for
QuartcSession into three categories:
- Per-channel send events
- Per-channel receive events
- Session-wide events that are not multiplexed
QuartcMultiplexer works at a low level to hide streams and messages from other
senders and receivers. It consists of a core multiplexer object which interacts
with the QuartcEndpoint/Session, a channel sender which handles outgoing data,
and a channel receiver which handles incoming data.
The sender has a specific channel id, specified on creation. A channel writes
this channel id in a varint form at the start of each new stream or datagram it
sends.
The multiplexer intercepts all the callbacks for incoming streams and datagrams.
It reads a varint from the start of each stream or datagram to identify the
channel id. It then looks up a receiver for that channel and delegates the
stream or datagram to that receiver.
A default receiver may be registered to handle all streams or datagrams not
assigned to a specific receiver. This allows endpoints to dispatch unhandled
data to a catch-all, or to await incoming data before registering a specific
receiver. The latter may be useful to in conjunction with some forms of
negotiation; eg. when certain ranges of channel ids are allocated to different
protocols, but it is unknown which protocol will be used at startup.
gfe-relnote: n/a (quartc only)
PiperOrigin-RevId: 260588723
Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_multiplexer.cc b/quic/quartc/quartc_multiplexer.cc
new file mode 100644
index 0000000..001e219
--- /dev/null
+++ b/quic/quartc/quartc_multiplexer.cc
@@ -0,0 +1,269 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
+
+#include <cstdint>
+
+#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+
+namespace quic {
+
+QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer,
+ uint64_t id,
+ QuicBufferAllocator* allocator,
+ Delegate* delegate)
+ : multiplexer_(multiplexer),
+ id_(id),
+ encoded_length_(QuicDataWriter::GetVarInt62Len(id_)),
+ allocator_(allocator),
+ delegate_(delegate) {}
+
+QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() {
+ if (!session_) {
+ QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_;
+ return nullptr;
+ }
+ QuicMemSlice id_slice = EncodeChannelId();
+
+ QuartcStream* stream = session_->CreateOutgoingBidirectionalStream();
+ QuicConsumedData consumed =
+ stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false);
+ DCHECK_EQ(consumed.bytes_consumed, encoded_length_);
+ return stream;
+}
+
+bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message,
+ int64_t datagram_id) {
+ if (!session_) {
+ QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_
+ << "datagram size=" << message.total_length();
+ return false;
+ }
+ QuicMemSliceStorage storage(nullptr, 0, nullptr, 0); // Empty storage.
+ storage.Append(EncodeChannelId());
+
+ message.ConsumeAll(
+ [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); });
+
+ // Allocate a unique datagram id so that notifications can be routed back to
+ // the right send channel.
+ int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this);
+ multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id;
+
+ return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id);
+}
+
+void QuartcSendChannel::OnMessageSent(int64_t datagram_id) {
+ // Map back to the caller-chosen |datagram_id|.
+ datagram_id = multiplexer_to_user_datagram_ids_[datagram_id];
+ delegate_->OnMessageSent(datagram_id);
+}
+
+void QuartcSendChannel::OnMessageAcked(int64_t datagram_id,
+ QuicTime receive_timestamp) {
+ // Map back to the caller-chosen |datagram_id|.
+ auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
+ if (it == multiplexer_to_user_datagram_ids_.end()) {
+ QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
+ << datagram_id;
+ return;
+ }
+ delegate_->OnMessageAcked(it->second, receive_timestamp);
+ multiplexer_to_user_datagram_ids_.erase(it);
+}
+
+void QuartcSendChannel::OnMessageLost(int64_t datagram_id) {
+ // Map back to the caller-chosen |datagram_id|.
+ auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
+ if (it == multiplexer_to_user_datagram_ids_.end()) {
+ QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
+ << datagram_id;
+ return;
+ }
+ delegate_->OnMessageLost(it->second);
+ multiplexer_to_user_datagram_ids_.erase(it);
+}
+
+void QuartcSendChannel::OnSessionCreated(QuartcSession* session) {
+ session_ = session;
+}
+
+QuicMemSlice QuartcSendChannel::EncodeChannelId() {
+ QuicMemSlice id_slice(allocator_, encoded_length_);
+ QuicDataWriter writer(encoded_length_, const_cast<char*>(id_slice.data()));
+ writer.WriteVarInt62(id_);
+ return id_slice;
+}
+
+QuartcMultiplexer::QuartcMultiplexer(
+ QuicBufferAllocator* allocator,
+ QuartcSessionEventDelegate* session_delegate,
+ QuartcReceiveChannel* default_receive_channel)
+ : allocator_(allocator),
+ session_delegate_(session_delegate),
+ default_receive_channel_(default_receive_channel) {
+ CHECK_NE(session_delegate_, nullptr);
+ CHECK_NE(default_receive_channel_, nullptr);
+}
+
+QuartcSendChannel* QuartcMultiplexer::CreateSendChannel(
+ uint64_t channel_id,
+ QuartcSendChannel::Delegate* delegate) {
+ send_channels_.push_back(QuicMakeUnique<QuartcSendChannel>(
+ this, channel_id, allocator_, delegate));
+ if (session_) {
+ send_channels_.back()->OnSessionCreated(session_);
+ }
+ return send_channels_.back().get();
+}
+
+void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id,
+ QuartcReceiveChannel* channel) {
+ if (channel == nullptr) {
+ receive_channels_.erase(channel_id);
+ return;
+ }
+ auto& registered_channel = receive_channels_[channel_id];
+ if (registered_channel) {
+ QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id="
+ << channel_id;
+ return;
+ }
+ registered_channel = channel;
+}
+
+int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) {
+ send_channels_by_datagram_id_[next_datagram_id_] = channel;
+ return next_datagram_id_++;
+}
+
+void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) {
+ for (auto& channel : send_channels_) {
+ channel->OnSessionCreated(session);
+ }
+ session_ = session;
+ session_delegate_->OnSessionCreated(session);
+}
+
+void QuartcMultiplexer::OnCryptoHandshakeComplete() {
+ session_delegate_->OnCryptoHandshakeComplete();
+}
+
+void QuartcMultiplexer::OnConnectionWritable() {
+ session_delegate_->OnConnectionWritable();
+}
+
+void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) {
+ stream->SetDelegate(this);
+}
+
+void QuartcMultiplexer::OnCongestionControlChange(
+ QuicBandwidth bandwidth_estimate,
+ QuicBandwidth pacing_rate,
+ QuicTime::Delta latest_rtt) {
+ session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate,
+ latest_rtt);
+}
+
+void QuartcMultiplexer::OnConnectionClosed(
+ const QuicConnectionCloseFrame& frame,
+ ConnectionCloseSource source) {
+ session_delegate_->OnConnectionClosed(frame, source);
+}
+
+void QuartcMultiplexer::OnMessageReceived(QuicStringPiece message) {
+ QuicDataReader reader(message);
+ QuicVariableLengthIntegerLength channel_id_length =
+ reader.PeekVarInt62Length();
+
+ uint64_t channel_id;
+ if (!reader.ReadVarInt62(&channel_id)) {
+ QUIC_LOG(DFATAL) << "Received message without properly encoded channel id";
+ return;
+ }
+
+ QuartcReceiveChannel* channel = default_receive_channel_;
+ auto it = receive_channels_.find(channel_id);
+ if (it != receive_channels_.end()) {
+ channel = it->second;
+ }
+
+ channel->OnMessageReceived(channel_id, message.substr(channel_id_length));
+}
+
+void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) {
+ auto it = send_channels_by_datagram_id_.find(datagram_id);
+ if (it == send_channels_by_datagram_id_.end()) {
+ return;
+ }
+ it->second->OnMessageSent(datagram_id);
+}
+
+void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id,
+ QuicTime receive_timestamp) {
+ auto it = send_channels_by_datagram_id_.find(datagram_id);
+ if (it == send_channels_by_datagram_id_.end()) {
+ return;
+ }
+ it->second->OnMessageAcked(datagram_id, receive_timestamp);
+ send_channels_by_datagram_id_.erase(it);
+}
+
+void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) {
+ auto it = send_channels_by_datagram_id_.find(datagram_id);
+ if (it == send_channels_by_datagram_id_.end()) {
+ return;
+ }
+ it->second->OnMessageLost(datagram_id);
+ send_channels_by_datagram_id_.erase(it);
+}
+
+size_t QuartcMultiplexer::OnReceived(QuartcStream* stream,
+ iovec* iov,
+ size_t iov_length,
+ bool /*fin*/) {
+ if (iov == nullptr || iov_length <= 0) {
+ return 0;
+ }
+
+ QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len);
+ QuicVariableLengthIntegerLength channel_id_length =
+ reader.PeekVarInt62Length();
+
+ uint64_t channel_id;
+ if (reader.BytesRemaining() >= channel_id_length) {
+ // Fast path, have enough data to read immediately.
+ if (!reader.ReadVarInt62(&channel_id)) {
+ return 0;
+ }
+ } else {
+ // Slow path, need to coalesce multiple iovecs.
+ std::string data;
+ for (size_t i = 0; i < iov_length; ++i) {
+ data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
+ }
+ QuicDataReader combined_reader(data);
+ if (!combined_reader.ReadVarInt62(&channel_id)) {
+ return 0;
+ }
+ }
+
+ QuartcReceiveChannel* channel = default_receive_channel_;
+ auto it = receive_channels_.find(channel_id);
+ if (it != receive_channels_.end()) {
+ channel = it->second;
+ }
+ channel->OnIncomingStream(channel_id, stream);
+ return channel_id_length;
+}
+
+void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {}
+
+void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {}
+
+} // namespace quic