blob: 64609cbc56e45b32152f10fabad538f59d1163a0 [file] [log] [blame]
QUICHE team335e56f2019-07-29 15:06:31 -07001// Copyright (c) 2019 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
6
7#include <memory>
8
9#include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h"
10#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
11#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
12#include "net/third_party/quiche/src/quic/core/quic_time.h"
13#include "net/third_party/quiche/src/quic/core/quic_types.h"
14#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
15#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
16#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
17#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
18#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
19#include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h"
20#include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h"
21#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
22#include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h"
23#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
24#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
25#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h"
26#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
27#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
28
29namespace quic {
30namespace {
31
32using ::testing::ElementsAreArray;
33using ::testing::Gt;
34using ::testing::IsEmpty;
35using ::testing::Pair;
36
37constexpr QuicTime::Delta kPropagationDelay =
38 QuicTime::Delta::FromMilliseconds(10);
39
40class FakeSessionEventDelegate : public QuartcSessionEventDelegate {
41 public:
42 void OnSessionCreated(QuartcSession* session) override {
43 session->StartCryptoHandshake();
44 session_ = session;
45 }
46
47 void OnConnectionWritable() override { ++writable_count_; }
48
49 void OnCryptoHandshakeComplete() override { ++handshake_count_; }
50
51 void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
52 ConnectionCloseSource source) override {
53 error_ = frame.quic_error_code;
54 close_source_ = source;
55 }
56
57 void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
58 QuicBandwidth pacing_rate,
59 QuicTime::Delta latest_rtt) override {
60 latest_bandwidth_estimate_ = bandwidth_estimate;
61 latest_pacing_rate_ = pacing_rate;
62 latest_rtt_ = latest_rtt;
63 }
64
65 QuartcSession* session() { return session_; }
66 int writable_count() const { return writable_count_; }
67 int handshake_count() const { return handshake_count_; }
68 QuicErrorCode error() const { return error_; }
69 ConnectionCloseSource close_source() const { return close_source_; }
70 QuicBandwidth latest_bandwidth_estimate() const {
71 return latest_bandwidth_estimate_;
72 }
73 QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; }
74 QuicTime::Delta latest_rtt() const { return latest_rtt_; }
75
76 private:
77 QuartcSession* session_ = nullptr;
78 int writable_count_ = 0;
79 int handshake_count_ = 0;
80 QuicErrorCode error_ = QUIC_NO_ERROR;
81 ConnectionCloseSource close_source_;
82 QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero();
83 QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero();
84 QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero();
85};
86
87class FakeSendDelegate : public QuartcSendChannel::Delegate {
88 public:
89 void OnMessageSent(int64_t datagram_id) override {
90 datagrams_sent_.push_back(datagram_id);
91 }
92
93 void OnMessageAcked(int64_t datagram_id,
94 QuicTime receive_timestamp) override {
95 datagrams_acked_.push_back({datagram_id, receive_timestamp});
96 }
97
98 void OnMessageLost(int64_t datagram_id) override {
99 datagrams_lost_.push_back(datagram_id);
100 }
101
102 const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; }
103 const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const {
104 return datagrams_acked_;
105 }
106 const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; }
107
108 private:
109 std::vector<int64_t> datagrams_sent_;
110 std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_;
111 std::vector<int64_t> datagrams_lost_;
112};
113
114class FakeReceiveDelegate : public QuartcReceiveChannel,
115 public QuartcStream::Delegate {
116 public:
117 const std::vector<std::pair<uint64_t, std::string>> messages_received()
118 const {
119 return messages_received_;
120 }
121
122 void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override {
123 stream->SetDelegate(this);
124 stream_to_channel_id_[stream] = channel_id;
125 }
126
127 void OnMessageReceived(uint64_t channel_id,
128 QuicStringPiece message) override {
129 messages_received_.emplace_back(channel_id, message);
130 }
131
132 // Stream delegate overrides.
133 size_t OnReceived(QuartcStream* stream,
134 iovec* iov,
135 size_t iov_length,
136 bool fin) override {
137 if (!fin) {
138 return 0;
139 }
140
141 size_t bytes = 0;
142 std::string message;
143 for (size_t i = 0; i < iov_length; ++i) {
144 message +=
145 std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
146 bytes += iov[i].iov_len;
147 }
148 QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel "
149 << stream_to_channel_id_[stream];
150 messages_received_.emplace_back(stream_to_channel_id_[stream], message);
151 return bytes;
152 }
153
154 void OnClose(QuartcStream* stream) override {
155 stream_to_channel_id_.erase(stream);
156 }
157
158 void OnBufferChanged(QuartcStream* /*stream*/) override {}
159
160 private:
161 std::vector<std::pair<uint64_t, std::string>> messages_received_;
162 QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_;
163};
164
165class QuartcMultiplexerTest : public QuicTest {
166 public:
167 QuartcMultiplexerTest()
168 : simulator_(),
169 client_transport_(&simulator_,
170 "client_transport",
171 "server_transport",
172 10 * kDefaultMaxPacketSize),
173 server_transport_(&simulator_,
174 "server_transport",
175 "client_transport",
176 10 * kDefaultMaxPacketSize),
177 client_filter_(&simulator_, "client_filter", &client_transport_),
178 client_server_link_(&client_filter_,
179 &server_transport_,
180 QuicBandwidth::FromKBitsPerSecond(10 * 1000),
181 kPropagationDelay),
182 client_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
183 &client_session_delegate_,
184 &client_default_receiver_),
185 server_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
186 &server_session_delegate_,
187 &server_default_receiver_),
188 client_endpoint_(QuicMakeUnique<QuartcClientEndpoint>(
189 simulator_.GetAlarmFactory(),
190 simulator_.GetClock(),
191 simulator_.GetRandomGenerator(),
192 &client_multiplexer_,
193 quic::QuartcSessionConfig(),
194 /*serialized_server_config=*/"")),
195 server_endpoint_(QuicMakeUnique<QuartcServerEndpoint>(
196 simulator_.GetAlarmFactory(),
197 simulator_.GetClock(),
198 simulator_.GetRandomGenerator(),
199 &server_multiplexer_,
200 quic::QuartcSessionConfig())) {
201 // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
202 SetQuicReloadableFlag(quic_enable_version_99, false);
203 }
204
205 void Connect() {
206 client_endpoint_->Connect(&client_transport_);
207 server_endpoint_->Connect(&server_transport_);
208 ASSERT_TRUE(simulator_.RunUntil([this]() {
209 return client_session_delegate_.writable_count() > 0 &&
210 server_session_delegate_.writable_count() > 0;
211 }));
212 }
213
214 void Disconnect() {
215 client_session_delegate_.session()->CloseConnection("test");
216 server_session_delegate_.session()->CloseConnection("test");
217 }
218
219 protected:
220 QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; }
221
222 QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; }
223
224 simulator::Simulator simulator_;
225
226 simulator::SimulatedQuartcPacketTransport client_transport_;
227 simulator::SimulatedQuartcPacketTransport server_transport_;
228 simulator::CountingPacketFilter client_filter_;
229 simulator::SymmetricLink client_server_link_;
230
231 FakeSessionEventDelegate client_session_delegate_;
232 FakeSessionEventDelegate server_session_delegate_;
233
234 FakeReceiveDelegate client_default_receiver_;
235 FakeReceiveDelegate server_default_receiver_;
236
237 QuartcMultiplexer client_multiplexer_;
238 QuartcMultiplexer server_multiplexer_;
239
240 std::unique_ptr<QuartcClientEndpoint> client_endpoint_;
241 std::unique_ptr<QuartcServerEndpoint> server_endpoint_;
242};
243
244TEST_F(QuartcMultiplexerTest, MultiplexMessages) {
245 Connect();
246
247 FakeSendDelegate send_delegate_1;
248 QuartcSendChannel* send_channel_1 =
249 client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
250 FakeSendDelegate send_delegate_2;
251 QuartcSendChannel* send_channel_2 =
252 client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
253
254 FakeReceiveDelegate receive_delegate_1;
255 server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
256
257 int num_messages = 10;
258 std::vector<std::pair<uint64_t, std::string>> messages_1;
259 messages_1.reserve(num_messages);
260 std::vector<std::pair<uint64_t, std::string>> messages_2;
261 messages_2.reserve(num_messages);
262 std::vector<int64_t> messages_sent_1;
263 std::vector<int64_t> messages_sent_2;
264 std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1;
265 std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2;
266 for (int i = 0; i < num_messages; ++i) {
267 messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
268 test::QuicTestMemSliceVector slice_1(
269 {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
270 messages_1.back().second.size())});
271 send_channel_1->SendOrQueueMessage(slice_1.span(), i);
272 messages_sent_1.push_back(i);
273 ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero())));
274
275 messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
276 test::QuicTestMemSliceVector slice_2(
277 {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
278 messages_2.back().second.size())});
279 // Use i + 5 as the datagram id for channel 2, so that some of the ids
280 // overlap and some are disjoint.
281 send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
282 messages_sent_2.push_back(i + 5);
283 ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero())));
284 }
285
286 EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() {
287 return send_delegate_1.datagrams_acked().size() == 10 &&
288 send_delegate_2.datagrams_acked().size() == 10;
289 }));
290
291 EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1);
292 EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2);
293
294 EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
295 EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
296
297 EXPECT_THAT(send_delegate_1.datagrams_acked(),
298 ElementsAreArray(ack_matchers_1));
299 EXPECT_THAT(send_delegate_2.datagrams_acked(),
300 ElementsAreArray(ack_matchers_2));
301}
302
303TEST_F(QuartcMultiplexerTest, MultiplexStreams) {
304 FakeSendDelegate send_delegate_1;
305 QuartcSendChannel* send_channel_1 =
306 client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
307 FakeSendDelegate send_delegate_2;
308 QuartcSendChannel* send_channel_2 =
309 client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
310
311 FakeQuartcStreamDelegate fake_send_stream_delegate;
312
313 FakeReceiveDelegate receive_delegate_1;
314 server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
315
316 Connect();
317
318 int num_messages = 10;
319 std::vector<std::pair<uint64_t, std::string>> messages_1;
320 messages_1.reserve(num_messages);
321 std::vector<std::pair<uint64_t, std::string>> messages_2;
322 messages_2.reserve(num_messages);
323 for (int i = 0; i < num_messages; ++i) {
324 messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
325 test::QuicTestMemSliceVector slice_1(
326 {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
327 messages_1.back().second.size())});
328 QuartcStream* stream_1 =
329 send_channel_1->CreateOutgoingBidirectionalStream();
330 stream_1->SetDelegate(&fake_send_stream_delegate);
331 stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true);
332
333 messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
334 test::QuicTestMemSliceVector slice_2(
335 {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
336 messages_2.back().second.size())});
337 QuartcStream* stream_2 =
338 send_channel_2->CreateOutgoingBidirectionalStream();
339 stream_2->SetDelegate(&fake_send_stream_delegate);
340 stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true);
341 }
342
343 EXPECT_TRUE(simulator_.RunUntilOrTimeout(
344 [this, &receive_delegate_1]() {
345 return receive_delegate_1.messages_received().size() == 10 &&
346 server_default_receiver_.messages_received().size() == 10;
347 },
348 QuicTime::Delta::FromSeconds(5)));
349
350 EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
351 EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
352}
353
354// Tests that datagram-lost callbacks are invoked on the right send channel
355// delegate, and that they work with overlapping datagram ids.
356TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) {
357 Connect();
358 ASSERT_TRUE(simulator_.RunUntil([this]() {
359 return client_session_delegate_.handshake_count() > 0 &&
360 server_session_delegate_.handshake_count() > 0;
361 }));
362
363 // Just drop everything we try to send.
364 client_filter_.set_packets_to_drop(30);
365
366 FakeSendDelegate send_delegate_1;
367 QuartcSendChannel* send_channel_1 =
368 client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
369 FakeSendDelegate send_delegate_2;
370 QuartcSendChannel* send_channel_2 =
371 client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
372
373 FakeQuartcStreamDelegate fake_send_stream_delegate;
374
375 FakeReceiveDelegate receive_delegate_1;
376 server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
377
378 int num_messages = 10;
379 std::vector<std::pair<uint64_t, std::string>> messages_1;
380 messages_1.reserve(num_messages);
381 std::vector<std::pair<uint64_t, std::string>> messages_2;
382 messages_2.reserve(num_messages);
383 std::vector<int64_t> messages_sent_1;
384 std::vector<int64_t> messages_sent_2;
385 for (int i = 0; i < num_messages; ++i) {
386 messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
387 test::QuicTestMemSliceVector slice_1(
388 {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
389 messages_1.back().second.size())});
390 send_channel_1->SendOrQueueMessage(slice_1.span(), i);
391 messages_sent_1.push_back(i);
392
393 messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
394 test::QuicTestMemSliceVector slice_2(
395 {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
396 messages_2.back().second.size())});
397 // Use i + 5 as the datagram id for channel 2, so that some of the ids
398 // overlap and some are disjoint.
399 send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
400 messages_sent_2.push_back(i + 5);
401 }
402
403 // Now send something retransmittable to prompt loss detection.
404 // If we never send anything retransmittable, we will never get acks, and
405 // never detect losses.
406 messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages));
407 test::QuicTestMemSliceVector slice(
408 {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
409 messages_1.back().second.size())});
410 QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream();
411 stream_1->SetDelegate(&fake_send_stream_delegate);
412 stream_1->WriteMemSlices(slice.span(), /*fin=*/true);
413
414 EXPECT_TRUE(simulator_.RunUntilOrTimeout(
415 [&send_delegate_1, &send_delegate_2]() {
416 return send_delegate_1.datagrams_lost().size() == 10 &&
417 send_delegate_2.datagrams_lost().size() == 10;
418 },
419 QuicTime::Delta::FromSeconds(60)));
420
421 EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1);
422 EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2);
423
424 EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty());
425 EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty());
426
427 EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty());
428 EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty());
429}
430
431TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) {
432 Connect();
433
434 FakeSendDelegate send_delegate;
435 QuartcSendChannel* send_channel =
436 client_multiplexer()->CreateSendChannel(1, &send_delegate);
437 FakeQuartcStreamDelegate fake_send_stream_delegate;
438
439 FakeReceiveDelegate receive_delegate;
440 server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate);
441 server_multiplexer()->RegisterReceiveChannel(1, nullptr);
442
443 int num_messages = 10;
444 std::vector<std::pair<uint64_t, std::string>> messages;
445 messages.reserve(num_messages);
446 std::vector<int64_t> messages_sent;
447 std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers;
448 for (int i = 0; i < num_messages; ++i) {
449 messages.emplace_back(1, QuicStrCat("message for 1: ", i));
450 test::QuicTestMemSliceVector slice(
451 {std::make_pair(const_cast<char*>(messages.back().second.data()),
452 messages.back().second.size())});
453 send_channel->SendOrQueueMessage(slice.span(), i);
454 messages_sent.push_back(i);
455 ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero())));
456 }
457
458 EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() {
459 return send_delegate.datagrams_acked().size() == 10;
460 }));
461
462 EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent);
463 EXPECT_EQ(server_default_receiver_.messages_received(), messages);
464 EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers));
465}
466
467TEST_F(QuartcMultiplexerTest, CloseEvent) {
468 Connect();
469 Disconnect();
470
471 EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
472 EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
473}
474
475TEST_F(QuartcMultiplexerTest, CongestionEvent) {
476 Connect();
477 ASSERT_TRUE(simulator_.RunUntil([this]() {
478 return client_session_delegate_.handshake_count() > 0 &&
479 server_session_delegate_.handshake_count() > 0;
480 }));
481
482 EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(),
483 QuicBandwidth::Zero());
484 EXPECT_GT(client_session_delegate_.latest_pacing_rate(),
485 QuicBandwidth::Zero());
486 EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero());
487}
488
489} // namespace
490} // namespace quic