blob: 376fac02c8cd861f65679751e813e22faf576bd4 [file] [log] [blame]
QUICHE team335e56f2019-07-29 15:06:31 -07001// Copyright (c) 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
6
7#include <cstdint>
bnc463f2352019-10-10 04:49:34 -07008#include <utility>
QUICHE team335e56f2019-07-29 15:06:31 -07009
vasilvvff082a02019-12-24 22:07:51 -080010#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
QUICHE team335e56f2019-07-29 15:06:31 -070011#include "net/third_party/quiche/src/quic/core/quic_data_writer.h"
12#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice.h"
13#include "net/third_party/quiche/src/quic/platform/api/quic_mem_slice_span.h"
dmcardlec60e87a2019-12-12 09:43:19 -080014#include "net/third_party/quiche/src/common/platform/api/quiche_string_piece.h"
QUICHE team335e56f2019-07-29 15:06:31 -070015
16namespace quic {
17
18QuartcSendChannel::QuartcSendChannel(QuartcMultiplexer* multiplexer,
19 uint64_t id,
20 QuicBufferAllocator* allocator,
21 Delegate* delegate)
22 : multiplexer_(multiplexer),
23 id_(id),
24 encoded_length_(QuicDataWriter::GetVarInt62Len(id_)),
25 allocator_(allocator),
26 delegate_(delegate) {}
27
28QuartcStream* QuartcSendChannel::CreateOutgoingBidirectionalStream() {
29 if (!session_) {
30 QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_;
31 return nullptr;
32 }
33 QuicMemSlice id_slice = EncodeChannelId();
34
35 QuartcStream* stream = session_->CreateOutgoingBidirectionalStream();
36 QuicConsumedData consumed =
37 stream->WriteMemSlices(QuicMemSliceSpan(&id_slice), /*fin=*/false);
38 DCHECK_EQ(consumed.bytes_consumed, encoded_length_);
39 return stream;
40}
41
42bool QuartcSendChannel::SendOrQueueMessage(QuicMemSliceSpan message,
43 int64_t datagram_id) {
44 if (!session_) {
45 QUIC_LOG(DFATAL) << "Session is not ready to write yet; channel_id=" << id_
46 << "datagram size=" << message.total_length();
47 return false;
48 }
49 QuicMemSliceStorage storage(nullptr, 0, nullptr, 0); // Empty storage.
50 storage.Append(EncodeChannelId());
51
52 message.ConsumeAll(
53 [&storage](QuicMemSlice slice) { storage.Append(std::move(slice)); });
54
55 // Allocate a unique datagram id so that notifications can be routed back to
56 // the right send channel.
57 int64_t unique_datagram_id = multiplexer_->AllocateDatagramId(this);
58 multiplexer_to_user_datagram_ids_[unique_datagram_id] = datagram_id;
59
60 return session_->SendOrQueueMessage(storage.ToSpan(), unique_datagram_id);
61}
62
63void QuartcSendChannel::OnMessageSent(int64_t datagram_id) {
64 // Map back to the caller-chosen |datagram_id|.
65 datagram_id = multiplexer_to_user_datagram_ids_[datagram_id];
66 delegate_->OnMessageSent(datagram_id);
67}
68
69void QuartcSendChannel::OnMessageAcked(int64_t datagram_id,
70 QuicTime receive_timestamp) {
71 // Map back to the caller-chosen |datagram_id|.
72 auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
73 if (it == multiplexer_to_user_datagram_ids_.end()) {
74 QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
75 << datagram_id;
76 return;
77 }
78 delegate_->OnMessageAcked(it->second, receive_timestamp);
79 multiplexer_to_user_datagram_ids_.erase(it);
80}
81
82void QuartcSendChannel::OnMessageLost(int64_t datagram_id) {
83 // Map back to the caller-chosen |datagram_id|.
84 auto it = multiplexer_to_user_datagram_ids_.find(datagram_id);
85 if (it == multiplexer_to_user_datagram_ids_.end()) {
86 QUIC_LOG(DFATAL) << "Datagram acked/lost multiple times; datagram_id="
87 << datagram_id;
88 return;
89 }
90 delegate_->OnMessageLost(it->second);
91 multiplexer_to_user_datagram_ids_.erase(it);
92}
93
94void QuartcSendChannel::OnSessionCreated(QuartcSession* session) {
95 session_ = session;
96}
97
98QuicMemSlice QuartcSendChannel::EncodeChannelId() {
vasilvvff082a02019-12-24 22:07:51 -080099 QuicUniqueBufferPtr buffer = MakeUniqueBuffer(allocator_, encoded_length_);
100 QuicDataWriter writer(encoded_length_, buffer.get());
QUICHE team335e56f2019-07-29 15:06:31 -0700101 writer.WriteVarInt62(id_);
vasilvvff082a02019-12-24 22:07:51 -0800102 return QuicMemSlice(std::move(buffer), encoded_length_);
QUICHE team335e56f2019-07-29 15:06:31 -0700103}
104
105QuartcMultiplexer::QuartcMultiplexer(
106 QuicBufferAllocator* allocator,
107 QuartcSessionEventDelegate* session_delegate,
108 QuartcReceiveChannel* default_receive_channel)
109 : allocator_(allocator),
110 session_delegate_(session_delegate),
111 default_receive_channel_(default_receive_channel) {
112 CHECK_NE(session_delegate_, nullptr);
113 CHECK_NE(default_receive_channel_, nullptr);
114}
115
116QuartcSendChannel* QuartcMultiplexer::CreateSendChannel(
117 uint64_t channel_id,
118 QuartcSendChannel::Delegate* delegate) {
vasilvv0fc587f2019-09-06 13:33:08 -0700119 send_channels_.push_back(std::make_unique<QuartcSendChannel>(
QUICHE team335e56f2019-07-29 15:06:31 -0700120 this, channel_id, allocator_, delegate));
121 if (session_) {
122 send_channels_.back()->OnSessionCreated(session_);
123 }
124 return send_channels_.back().get();
125}
126
127void QuartcMultiplexer::RegisterReceiveChannel(uint64_t channel_id,
128 QuartcReceiveChannel* channel) {
129 if (channel == nullptr) {
130 receive_channels_.erase(channel_id);
131 return;
132 }
133 auto& registered_channel = receive_channels_[channel_id];
134 if (registered_channel) {
135 QUIC_LOG(DFATAL) << "Attempted to overwrite existing channel_id="
136 << channel_id;
137 return;
138 }
139 registered_channel = channel;
140}
141
142int64_t QuartcMultiplexer::AllocateDatagramId(QuartcSendChannel* channel) {
143 send_channels_by_datagram_id_[next_datagram_id_] = channel;
144 return next_datagram_id_++;
145}
146
147void QuartcMultiplexer::OnSessionCreated(QuartcSession* session) {
148 for (auto& channel : send_channels_) {
149 channel->OnSessionCreated(session);
150 }
151 session_ = session;
152 session_delegate_->OnSessionCreated(session);
153}
154
155void QuartcMultiplexer::OnCryptoHandshakeComplete() {
156 session_delegate_->OnCryptoHandshakeComplete();
157}
158
159void QuartcMultiplexer::OnConnectionWritable() {
160 session_delegate_->OnConnectionWritable();
161}
162
163void QuartcMultiplexer::OnIncomingStream(QuartcStream* stream) {
164 stream->SetDelegate(this);
165}
166
167void QuartcMultiplexer::OnCongestionControlChange(
168 QuicBandwidth bandwidth_estimate,
169 QuicBandwidth pacing_rate,
170 QuicTime::Delta latest_rtt) {
171 session_delegate_->OnCongestionControlChange(bandwidth_estimate, pacing_rate,
172 latest_rtt);
173}
174
175void QuartcMultiplexer::OnConnectionClosed(
176 const QuicConnectionCloseFrame& frame,
177 ConnectionCloseSource source) {
178 session_delegate_->OnConnectionClosed(frame, source);
179}
180
dmcardlec60e87a2019-12-12 09:43:19 -0800181void QuartcMultiplexer::OnMessageReceived(quiche::QuicheStringPiece message) {
QUICHE team335e56f2019-07-29 15:06:31 -0700182 QuicDataReader reader(message);
183 QuicVariableLengthIntegerLength channel_id_length =
184 reader.PeekVarInt62Length();
185
186 uint64_t channel_id;
187 if (!reader.ReadVarInt62(&channel_id)) {
188 QUIC_LOG(DFATAL) << "Received message without properly encoded channel id";
189 return;
190 }
191
192 QuartcReceiveChannel* channel = default_receive_channel_;
193 auto it = receive_channels_.find(channel_id);
194 if (it != receive_channels_.end()) {
195 channel = it->second;
196 }
197
198 channel->OnMessageReceived(channel_id, message.substr(channel_id_length));
199}
200
201void QuartcMultiplexer::OnMessageSent(int64_t datagram_id) {
202 auto it = send_channels_by_datagram_id_.find(datagram_id);
203 if (it == send_channels_by_datagram_id_.end()) {
204 return;
205 }
206 it->second->OnMessageSent(datagram_id);
207}
208
209void QuartcMultiplexer::OnMessageAcked(int64_t datagram_id,
210 QuicTime receive_timestamp) {
211 auto it = send_channels_by_datagram_id_.find(datagram_id);
212 if (it == send_channels_by_datagram_id_.end()) {
213 return;
214 }
215 it->second->OnMessageAcked(datagram_id, receive_timestamp);
216 send_channels_by_datagram_id_.erase(it);
217}
218
219void QuartcMultiplexer::OnMessageLost(int64_t datagram_id) {
220 auto it = send_channels_by_datagram_id_.find(datagram_id);
221 if (it == send_channels_by_datagram_id_.end()) {
222 return;
223 }
224 it->second->OnMessageLost(datagram_id);
225 send_channels_by_datagram_id_.erase(it);
226}
227
228size_t QuartcMultiplexer::OnReceived(QuartcStream* stream,
229 iovec* iov,
230 size_t iov_length,
231 bool /*fin*/) {
232 if (iov == nullptr || iov_length <= 0) {
233 return 0;
234 }
235
236 QuicDataReader reader(static_cast<char*>(iov[0].iov_base), iov[0].iov_len);
237 QuicVariableLengthIntegerLength channel_id_length =
238 reader.PeekVarInt62Length();
239
240 uint64_t channel_id;
241 if (reader.BytesRemaining() >= channel_id_length) {
242 // Fast path, have enough data to read immediately.
243 if (!reader.ReadVarInt62(&channel_id)) {
244 return 0;
245 }
246 } else {
247 // Slow path, need to coalesce multiple iovecs.
248 std::string data;
249 for (size_t i = 0; i < iov_length; ++i) {
250 data += std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
251 }
252 QuicDataReader combined_reader(data);
253 if (!combined_reader.ReadVarInt62(&channel_id)) {
254 return 0;
255 }
256 }
257
258 QuartcReceiveChannel* channel = default_receive_channel_;
259 auto it = receive_channels_.find(channel_id);
260 if (it != receive_channels_.end()) {
261 channel = it->second;
262 }
263 channel->OnIncomingStream(channel_id, stream);
264 return channel_id_length;
265}
266
267void QuartcMultiplexer::OnClose(QuartcStream* /*stream*/) {}
268
269void QuartcMultiplexer::OnBufferChanged(QuartcStream* /*stream*/) {}
270
271} // namespace quic