| // Copyright (c) 2019 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h" |
| |
| #include <memory> |
| |
| #include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h" |
| #include "net/third_party/quiche/src/quic/core/quic_bandwidth.h" |
| #include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h" |
| #include "net/third_party/quiche/src/quic/core/quic_time.h" |
| #include "net/third_party/quiche/src/quic/core/quic_types.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_containers.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_logging.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_test.h" |
| #include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h" |
| #include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h" |
| #include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h" |
| #include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h" |
| #include "net/third_party/quiche/src/quic/quartc/quartc_session.h" |
| #include "net/third_party/quiche/src/quic/quartc/quartc_stream.h" |
| #include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h" |
| #include "net/third_party/quiche/src/quic/test_tools/simulator/link.h" |
| #include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h" |
| |
| namespace quic { |
| namespace { |
| |
| using ::testing::ElementsAreArray; |
| using ::testing::Gt; |
| using ::testing::IsEmpty; |
| using ::testing::Pair; |
| |
| constexpr QuicTime::Delta kPropagationDelay = |
| QuicTime::Delta::FromMilliseconds(10); |
| |
| class FakeSessionEventDelegate : public QuartcSessionEventDelegate { |
| public: |
| void OnSessionCreated(QuartcSession* session) override { |
| session->StartCryptoHandshake(); |
| session_ = session; |
| } |
| |
| void OnConnectionWritable() override { ++writable_count_; } |
| |
| void OnCryptoHandshakeComplete() override { ++handshake_count_; } |
| |
| void OnConnectionClosed(const QuicConnectionCloseFrame& frame, |
| ConnectionCloseSource source) override { |
| error_ = frame.quic_error_code; |
| close_source_ = source; |
| } |
| |
| void OnCongestionControlChange(QuicBandwidth bandwidth_estimate, |
| QuicBandwidth pacing_rate, |
| QuicTime::Delta latest_rtt) override { |
| latest_bandwidth_estimate_ = bandwidth_estimate; |
| latest_pacing_rate_ = pacing_rate; |
| latest_rtt_ = latest_rtt; |
| } |
| |
| QuartcSession* session() { return session_; } |
| int writable_count() const { return writable_count_; } |
| int handshake_count() const { return handshake_count_; } |
| QuicErrorCode error() const { return error_; } |
| ConnectionCloseSource close_source() const { return close_source_; } |
| QuicBandwidth latest_bandwidth_estimate() const { |
| return latest_bandwidth_estimate_; |
| } |
| QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; } |
| QuicTime::Delta latest_rtt() const { return latest_rtt_; } |
| |
| private: |
| QuartcSession* session_ = nullptr; |
| int writable_count_ = 0; |
| int handshake_count_ = 0; |
| QuicErrorCode error_ = QUIC_NO_ERROR; |
| ConnectionCloseSource close_source_; |
| QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero(); |
| QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero(); |
| QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero(); |
| }; |
| |
| class FakeSendDelegate : public QuartcSendChannel::Delegate { |
| public: |
| void OnMessageSent(int64_t datagram_id) override { |
| datagrams_sent_.push_back(datagram_id); |
| } |
| |
| void OnMessageAcked(int64_t datagram_id, |
| QuicTime receive_timestamp) override { |
| datagrams_acked_.push_back({datagram_id, receive_timestamp}); |
| } |
| |
| void OnMessageLost(int64_t datagram_id) override { |
| datagrams_lost_.push_back(datagram_id); |
| } |
| |
| const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; } |
| const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const { |
| return datagrams_acked_; |
| } |
| const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; } |
| |
| private: |
| std::vector<int64_t> datagrams_sent_; |
| std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_; |
| std::vector<int64_t> datagrams_lost_; |
| }; |
| |
| class FakeReceiveDelegate : public QuartcReceiveChannel, |
| public QuartcStream::Delegate { |
| public: |
| const std::vector<std::pair<uint64_t, std::string>> messages_received() |
| const { |
| return messages_received_; |
| } |
| |
| void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override { |
| stream->SetDelegate(this); |
| stream_to_channel_id_[stream] = channel_id; |
| } |
| |
| void OnMessageReceived(uint64_t channel_id, |
| QuicStringPiece message) override { |
| messages_received_.emplace_back(channel_id, message); |
| } |
| |
| // Stream delegate overrides. |
| size_t OnReceived(QuartcStream* stream, |
| iovec* iov, |
| size_t iov_length, |
| bool fin) override { |
| if (!fin) { |
| return 0; |
| } |
| |
| size_t bytes = 0; |
| std::string message; |
| for (size_t i = 0; i < iov_length; ++i) { |
| message += |
| std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len); |
| bytes += iov[i].iov_len; |
| } |
| QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel " |
| << stream_to_channel_id_[stream]; |
| messages_received_.emplace_back(stream_to_channel_id_[stream], message); |
| return bytes; |
| } |
| |
| void OnClose(QuartcStream* stream) override { |
| stream_to_channel_id_.erase(stream); |
| } |
| |
| void OnBufferChanged(QuartcStream* /*stream*/) override {} |
| |
| private: |
| std::vector<std::pair<uint64_t, std::string>> messages_received_; |
| QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_; |
| }; |
| |
| class QuartcMultiplexerTest : public QuicTest { |
| public: |
| QuartcMultiplexerTest() |
| : simulator_(), |
| client_transport_(&simulator_, |
| "client_transport", |
| "server_transport", |
| 10 * kDefaultMaxPacketSize), |
| server_transport_(&simulator_, |
| "server_transport", |
| "client_transport", |
| 10 * kDefaultMaxPacketSize), |
| client_filter_(&simulator_, "client_filter", &client_transport_), |
| client_server_link_(&client_filter_, |
| &server_transport_, |
| QuicBandwidth::FromKBitsPerSecond(10 * 1000), |
| kPropagationDelay), |
| client_multiplexer_(simulator_.GetStreamSendBufferAllocator(), |
| &client_session_delegate_, |
| &client_default_receiver_), |
| server_multiplexer_(simulator_.GetStreamSendBufferAllocator(), |
| &server_session_delegate_, |
| &server_default_receiver_), |
| client_endpoint_(QuicMakeUnique<QuartcClientEndpoint>( |
| simulator_.GetAlarmFactory(), |
| simulator_.GetClock(), |
| simulator_.GetRandomGenerator(), |
| &client_multiplexer_, |
| quic::QuartcSessionConfig(), |
| /*serialized_server_config=*/"")), |
| server_endpoint_(QuicMakeUnique<QuartcServerEndpoint>( |
| simulator_.GetAlarmFactory(), |
| simulator_.GetClock(), |
| simulator_.GetRandomGenerator(), |
| &server_multiplexer_, |
| quic::QuartcSessionConfig())) { |
| // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps. |
| SetQuicReloadableFlag(quic_enable_version_99, false); |
| } |
| |
| void Connect() { |
| client_endpoint_->Connect(&client_transport_); |
| server_endpoint_->Connect(&server_transport_); |
| ASSERT_TRUE(simulator_.RunUntil([this]() { |
| return client_session_delegate_.writable_count() > 0 && |
| server_session_delegate_.writable_count() > 0; |
| })); |
| } |
| |
| void Disconnect() { |
| client_session_delegate_.session()->CloseConnection("test"); |
| server_session_delegate_.session()->CloseConnection("test"); |
| } |
| |
| protected: |
| QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; } |
| |
| QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; } |
| |
| simulator::Simulator simulator_; |
| |
| simulator::SimulatedQuartcPacketTransport client_transport_; |
| simulator::SimulatedQuartcPacketTransport server_transport_; |
| simulator::CountingPacketFilter client_filter_; |
| simulator::SymmetricLink client_server_link_; |
| |
| FakeSessionEventDelegate client_session_delegate_; |
| FakeSessionEventDelegate server_session_delegate_; |
| |
| FakeReceiveDelegate client_default_receiver_; |
| FakeReceiveDelegate server_default_receiver_; |
| |
| QuartcMultiplexer client_multiplexer_; |
| QuartcMultiplexer server_multiplexer_; |
| |
| std::unique_ptr<QuartcClientEndpoint> client_endpoint_; |
| std::unique_ptr<QuartcServerEndpoint> server_endpoint_; |
| }; |
| |
| TEST_F(QuartcMultiplexerTest, MultiplexMessages) { |
| Connect(); |
| |
| FakeSendDelegate send_delegate_1; |
| QuartcSendChannel* send_channel_1 = |
| client_multiplexer()->CreateSendChannel(1, &send_delegate_1); |
| FakeSendDelegate send_delegate_2; |
| QuartcSendChannel* send_channel_2 = |
| client_multiplexer()->CreateSendChannel(2, &send_delegate_2); |
| |
| FakeReceiveDelegate receive_delegate_1; |
| server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); |
| |
| int num_messages = 10; |
| std::vector<std::pair<uint64_t, std::string>> messages_1; |
| messages_1.reserve(num_messages); |
| std::vector<std::pair<uint64_t, std::string>> messages_2; |
| messages_2.reserve(num_messages); |
| std::vector<int64_t> messages_sent_1; |
| std::vector<int64_t> messages_sent_2; |
| std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1; |
| std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2; |
| for (int i = 0; i < num_messages; ++i) { |
| messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); |
| test::QuicTestMemSliceVector slice_1( |
| {std::make_pair(const_cast<char*>(messages_1.back().second.data()), |
| messages_1.back().second.size())}); |
| send_channel_1->SendOrQueueMessage(slice_1.span(), i); |
| messages_sent_1.push_back(i); |
| ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero()))); |
| |
| messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); |
| test::QuicTestMemSliceVector slice_2( |
| {std::make_pair(const_cast<char*>(messages_2.back().second.data()), |
| messages_2.back().second.size())}); |
| // Use i + 5 as the datagram id for channel 2, so that some of the ids |
| // overlap and some are disjoint. |
| send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5); |
| messages_sent_2.push_back(i + 5); |
| ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero()))); |
| } |
| |
| EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() { |
| return send_delegate_1.datagrams_acked().size() == 10 && |
| send_delegate_2.datagrams_acked().size() == 10; |
| })); |
| |
| EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1); |
| EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2); |
| |
| EXPECT_EQ(receive_delegate_1.messages_received(), messages_1); |
| EXPECT_EQ(server_default_receiver_.messages_received(), messages_2); |
| |
| EXPECT_THAT(send_delegate_1.datagrams_acked(), |
| ElementsAreArray(ack_matchers_1)); |
| EXPECT_THAT(send_delegate_2.datagrams_acked(), |
| ElementsAreArray(ack_matchers_2)); |
| } |
| |
| TEST_F(QuartcMultiplexerTest, MultiplexStreams) { |
| FakeSendDelegate send_delegate_1; |
| QuartcSendChannel* send_channel_1 = |
| client_multiplexer()->CreateSendChannel(1, &send_delegate_1); |
| FakeSendDelegate send_delegate_2; |
| QuartcSendChannel* send_channel_2 = |
| client_multiplexer()->CreateSendChannel(2, &send_delegate_2); |
| |
| FakeQuartcStreamDelegate fake_send_stream_delegate; |
| |
| FakeReceiveDelegate receive_delegate_1; |
| server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); |
| |
| Connect(); |
| |
| int num_messages = 10; |
| std::vector<std::pair<uint64_t, std::string>> messages_1; |
| messages_1.reserve(num_messages); |
| std::vector<std::pair<uint64_t, std::string>> messages_2; |
| messages_2.reserve(num_messages); |
| for (int i = 0; i < num_messages; ++i) { |
| messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); |
| test::QuicTestMemSliceVector slice_1( |
| {std::make_pair(const_cast<char*>(messages_1.back().second.data()), |
| messages_1.back().second.size())}); |
| QuartcStream* stream_1 = |
| send_channel_1->CreateOutgoingBidirectionalStream(); |
| stream_1->SetDelegate(&fake_send_stream_delegate); |
| stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true); |
| |
| messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); |
| test::QuicTestMemSliceVector slice_2( |
| {std::make_pair(const_cast<char*>(messages_2.back().second.data()), |
| messages_2.back().second.size())}); |
| QuartcStream* stream_2 = |
| send_channel_2->CreateOutgoingBidirectionalStream(); |
| stream_2->SetDelegate(&fake_send_stream_delegate); |
| stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true); |
| } |
| |
| EXPECT_TRUE(simulator_.RunUntilOrTimeout( |
| [this, &receive_delegate_1]() { |
| return receive_delegate_1.messages_received().size() == 10 && |
| server_default_receiver_.messages_received().size() == 10; |
| }, |
| QuicTime::Delta::FromSeconds(5))); |
| |
| EXPECT_EQ(receive_delegate_1.messages_received(), messages_1); |
| EXPECT_EQ(server_default_receiver_.messages_received(), messages_2); |
| } |
| |
| // Tests that datagram-lost callbacks are invoked on the right send channel |
| // delegate, and that they work with overlapping datagram ids. |
| TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) { |
| Connect(); |
| ASSERT_TRUE(simulator_.RunUntil([this]() { |
| return client_session_delegate_.handshake_count() > 0 && |
| server_session_delegate_.handshake_count() > 0; |
| })); |
| |
| // Just drop everything we try to send. |
| client_filter_.set_packets_to_drop(30); |
| |
| FakeSendDelegate send_delegate_1; |
| QuartcSendChannel* send_channel_1 = |
| client_multiplexer()->CreateSendChannel(1, &send_delegate_1); |
| FakeSendDelegate send_delegate_2; |
| QuartcSendChannel* send_channel_2 = |
| client_multiplexer()->CreateSendChannel(2, &send_delegate_2); |
| |
| FakeQuartcStreamDelegate fake_send_stream_delegate; |
| |
| FakeReceiveDelegate receive_delegate_1; |
| server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1); |
| |
| int num_messages = 10; |
| std::vector<std::pair<uint64_t, std::string>> messages_1; |
| messages_1.reserve(num_messages); |
| std::vector<std::pair<uint64_t, std::string>> messages_2; |
| messages_2.reserve(num_messages); |
| std::vector<int64_t> messages_sent_1; |
| std::vector<int64_t> messages_sent_2; |
| for (int i = 0; i < num_messages; ++i) { |
| messages_1.emplace_back(1, QuicStrCat("message for 1: ", i)); |
| test::QuicTestMemSliceVector slice_1( |
| {std::make_pair(const_cast<char*>(messages_1.back().second.data()), |
| messages_1.back().second.size())}); |
| send_channel_1->SendOrQueueMessage(slice_1.span(), i); |
| messages_sent_1.push_back(i); |
| |
| messages_2.emplace_back(2, QuicStrCat("message for 2: ", i)); |
| test::QuicTestMemSliceVector slice_2( |
| {std::make_pair(const_cast<char*>(messages_2.back().second.data()), |
| messages_2.back().second.size())}); |
| // Use i + 5 as the datagram id for channel 2, so that some of the ids |
| // overlap and some are disjoint. |
| send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5); |
| messages_sent_2.push_back(i + 5); |
| } |
| |
| // Now send something retransmittable to prompt loss detection. |
| // If we never send anything retransmittable, we will never get acks, and |
| // never detect losses. |
| messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages)); |
| test::QuicTestMemSliceVector slice( |
| {std::make_pair(const_cast<char*>(messages_1.back().second.data()), |
| messages_1.back().second.size())}); |
| QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream(); |
| stream_1->SetDelegate(&fake_send_stream_delegate); |
| stream_1->WriteMemSlices(slice.span(), /*fin=*/true); |
| |
| EXPECT_TRUE(simulator_.RunUntilOrTimeout( |
| [&send_delegate_1, &send_delegate_2]() { |
| return send_delegate_1.datagrams_lost().size() == 10 && |
| send_delegate_2.datagrams_lost().size() == 10; |
| }, |
| QuicTime::Delta::FromSeconds(60))); |
| |
| EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1); |
| EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2); |
| |
| EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty()); |
| EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty()); |
| |
| EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty()); |
| EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty()); |
| } |
| |
| TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) { |
| Connect(); |
| |
| FakeSendDelegate send_delegate; |
| QuartcSendChannel* send_channel = |
| client_multiplexer()->CreateSendChannel(1, &send_delegate); |
| FakeQuartcStreamDelegate fake_send_stream_delegate; |
| |
| FakeReceiveDelegate receive_delegate; |
| server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate); |
| server_multiplexer()->RegisterReceiveChannel(1, nullptr); |
| |
| int num_messages = 10; |
| std::vector<std::pair<uint64_t, std::string>> messages; |
| messages.reserve(num_messages); |
| std::vector<int64_t> messages_sent; |
| std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers; |
| for (int i = 0; i < num_messages; ++i) { |
| messages.emplace_back(1, QuicStrCat("message for 1: ", i)); |
| test::QuicTestMemSliceVector slice( |
| {std::make_pair(const_cast<char*>(messages.back().second.data()), |
| messages.back().second.size())}); |
| send_channel->SendOrQueueMessage(slice.span(), i); |
| messages_sent.push_back(i); |
| ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero()))); |
| } |
| |
| EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() { |
| return send_delegate.datagrams_acked().size() == 10; |
| })); |
| |
| EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent); |
| EXPECT_EQ(server_default_receiver_.messages_received(), messages); |
| EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers)); |
| } |
| |
| TEST_F(QuartcMultiplexerTest, CloseEvent) { |
| Connect(); |
| Disconnect(); |
| |
| EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED); |
| EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED); |
| } |
| |
| TEST_F(QuartcMultiplexerTest, CongestionEvent) { |
| Connect(); |
| ASSERT_TRUE(simulator_.RunUntil([this]() { |
| return client_session_delegate_.handshake_count() > 0 && |
| server_session_delegate_.handshake_count() > 0; |
| })); |
| |
| EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(), |
| QuicBandwidth::Zero()); |
| EXPECT_GT(client_session_delegate_.latest_pacing_rate(), |
| QuicBandwidth::Zero()); |
| EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero()); |
| } |
| |
| } // namespace |
| } // namespace quic |