blob: a8f53528e5ee238a8bcf359587cbb8bf58702e52 [file] [log] [blame]
// Copyright (c) 2019 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
#include <memory>
#include <utility>
#include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h"
#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
#include "net/third_party/quiche/src/quic/core/quic_time.h"
#include "net/third_party/quiche/src/quic/core/quic_types.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
#include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h"
#include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h"
#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
#include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h"
#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h"
#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
namespace quic {
namespace {
using ::testing::ElementsAreArray;
using ::testing::Gt;
using ::testing::IsEmpty;
using ::testing::Pair;
constexpr QuicTime::Delta kPropagationDelay =
QuicTime::Delta::FromMilliseconds(10);
class FakeSessionEventDelegate : public QuartcSessionEventDelegate {
public:
void OnSessionCreated(QuartcSession* session) override {
session->StartCryptoHandshake();
session_ = session;
}
void OnConnectionWritable() override { ++writable_count_; }
void OnCryptoHandshakeComplete() override { ++handshake_count_; }
void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
ConnectionCloseSource source) override {
error_ = frame.quic_error_code;
close_source_ = source;
}
void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
QuicBandwidth pacing_rate,
QuicTime::Delta latest_rtt) override {
latest_bandwidth_estimate_ = bandwidth_estimate;
latest_pacing_rate_ = pacing_rate;
latest_rtt_ = latest_rtt;
}
QuartcSession* session() { return session_; }
int writable_count() const { return writable_count_; }
int handshake_count() const { return handshake_count_; }
QuicErrorCode error() const { return error_; }
ConnectionCloseSource close_source() const { return close_source_; }
QuicBandwidth latest_bandwidth_estimate() const {
return latest_bandwidth_estimate_;
}
QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; }
QuicTime::Delta latest_rtt() const { return latest_rtt_; }
private:
QuartcSession* session_ = nullptr;
int writable_count_ = 0;
int handshake_count_ = 0;
QuicErrorCode error_ = QUIC_NO_ERROR;
ConnectionCloseSource close_source_;
QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero();
QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero();
QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero();
};
class FakeSendDelegate : public QuartcSendChannel::Delegate {
public:
void OnMessageSent(int64_t datagram_id) override {
datagrams_sent_.push_back(datagram_id);
}
void OnMessageAcked(int64_t datagram_id,
QuicTime receive_timestamp) override {
datagrams_acked_.push_back({datagram_id, receive_timestamp});
}
void OnMessageLost(int64_t datagram_id) override {
datagrams_lost_.push_back(datagram_id);
}
const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; }
const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const {
return datagrams_acked_;
}
const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; }
private:
std::vector<int64_t> datagrams_sent_;
std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_;
std::vector<int64_t> datagrams_lost_;
};
class FakeReceiveDelegate : public QuartcReceiveChannel,
public QuartcStream::Delegate {
public:
const std::vector<std::pair<uint64_t, std::string>> messages_received()
const {
return messages_received_;
}
void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override {
stream->SetDelegate(this);
stream_to_channel_id_[stream] = channel_id;
}
void OnMessageReceived(uint64_t channel_id,
QuicStringPiece message) override {
messages_received_.emplace_back(channel_id, message);
}
// Stream delegate overrides.
size_t OnReceived(QuartcStream* stream,
iovec* iov,
size_t iov_length,
bool fin) override {
if (!fin) {
return 0;
}
size_t bytes = 0;
std::string message;
for (size_t i = 0; i < iov_length; ++i) {
message +=
std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
bytes += iov[i].iov_len;
}
QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel "
<< stream_to_channel_id_[stream];
messages_received_.emplace_back(stream_to_channel_id_[stream], message);
return bytes;
}
void OnClose(QuartcStream* stream) override {
stream_to_channel_id_.erase(stream);
}
void OnBufferChanged(QuartcStream* /*stream*/) override {}
private:
std::vector<std::pair<uint64_t, std::string>> messages_received_;
QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_;
};
class QuartcMultiplexerTest : public QuicTest {
public:
QuartcMultiplexerTest()
: simulator_(),
client_transport_(&simulator_,
"client_transport",
"server_transport",
10 * kDefaultMaxPacketSize),
server_transport_(&simulator_,
"server_transport",
"client_transport",
10 * kDefaultMaxPacketSize),
client_filter_(&simulator_, "client_filter", &client_transport_),
client_server_link_(&client_filter_,
&server_transport_,
QuicBandwidth::FromKBitsPerSecond(10 * 1000),
kPropagationDelay),
client_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
&client_session_delegate_,
&client_default_receiver_),
server_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
&server_session_delegate_,
&server_default_receiver_),
client_endpoint_(std::make_unique<QuartcClientEndpoint>(
simulator_.GetAlarmFactory(),
simulator_.GetClock(),
simulator_.GetRandomGenerator(),
&client_multiplexer_,
quic::QuartcSessionConfig(),
/*serialized_server_config=*/"")),
server_endpoint_(std::make_unique<QuartcServerEndpoint>(
simulator_.GetAlarmFactory(),
simulator_.GetClock(),
simulator_.GetRandomGenerator(),
&server_multiplexer_,
quic::QuartcSessionConfig())) {
// TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
SetQuicReloadableFlag(quic_enable_version_99, false);
}
void Connect() {
client_endpoint_->Connect(&client_transport_);
server_endpoint_->Connect(&server_transport_);
ASSERT_TRUE(simulator_.RunUntil([this]() {
return client_session_delegate_.writable_count() > 0 &&
server_session_delegate_.writable_count() > 0;
}));
}
void Disconnect() {
client_session_delegate_.session()->CloseConnection("test");
server_session_delegate_.session()->CloseConnection("test");
}
protected:
QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; }
QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; }
simulator::Simulator simulator_;
simulator::SimulatedQuartcPacketTransport client_transport_;
simulator::SimulatedQuartcPacketTransport server_transport_;
simulator::CountingPacketFilter client_filter_;
simulator::SymmetricLink client_server_link_;
FakeSessionEventDelegate client_session_delegate_;
FakeSessionEventDelegate server_session_delegate_;
FakeReceiveDelegate client_default_receiver_;
FakeReceiveDelegate server_default_receiver_;
QuartcMultiplexer client_multiplexer_;
QuartcMultiplexer server_multiplexer_;
std::unique_ptr<QuartcClientEndpoint> client_endpoint_;
std::unique_ptr<QuartcServerEndpoint> server_endpoint_;
};
TEST_F(QuartcMultiplexerTest, MultiplexMessages) {
Connect();
FakeSendDelegate send_delegate_1;
QuartcSendChannel* send_channel_1 =
client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
FakeSendDelegate send_delegate_2;
QuartcSendChannel* send_channel_2 =
client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
FakeReceiveDelegate receive_delegate_1;
server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
int num_messages = 10;
std::vector<std::pair<uint64_t, std::string>> messages_1;
messages_1.reserve(num_messages);
std::vector<std::pair<uint64_t, std::string>> messages_2;
messages_2.reserve(num_messages);
std::vector<int64_t> messages_sent_1;
std::vector<int64_t> messages_sent_2;
std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1;
std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2;
for (int i = 0; i < num_messages; ++i) {
messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
test::QuicTestMemSliceVector slice_1(
{std::make_pair(const_cast<char*>(messages_1.back().second.data()),
messages_1.back().second.size())});
send_channel_1->SendOrQueueMessage(slice_1.span(), i);
messages_sent_1.push_back(i);
ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero())));
messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
test::QuicTestMemSliceVector slice_2(
{std::make_pair(const_cast<char*>(messages_2.back().second.data()),
messages_2.back().second.size())});
// Use i + 5 as the datagram id for channel 2, so that some of the ids
// overlap and some are disjoint.
send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
messages_sent_2.push_back(i + 5);
ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero())));
}
EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() {
return send_delegate_1.datagrams_acked().size() == 10 &&
send_delegate_2.datagrams_acked().size() == 10;
}));
EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1);
EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2);
EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
EXPECT_THAT(send_delegate_1.datagrams_acked(),
ElementsAreArray(ack_matchers_1));
EXPECT_THAT(send_delegate_2.datagrams_acked(),
ElementsAreArray(ack_matchers_2));
}
TEST_F(QuartcMultiplexerTest, MultiplexStreams) {
FakeSendDelegate send_delegate_1;
QuartcSendChannel* send_channel_1 =
client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
FakeSendDelegate send_delegate_2;
QuartcSendChannel* send_channel_2 =
client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
FakeQuartcStreamDelegate fake_send_stream_delegate;
FakeReceiveDelegate receive_delegate_1;
server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
Connect();
int num_messages = 10;
std::vector<std::pair<uint64_t, std::string>> messages_1;
messages_1.reserve(num_messages);
std::vector<std::pair<uint64_t, std::string>> messages_2;
messages_2.reserve(num_messages);
for (int i = 0; i < num_messages; ++i) {
messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
test::QuicTestMemSliceVector slice_1(
{std::make_pair(const_cast<char*>(messages_1.back().second.data()),
messages_1.back().second.size())});
QuartcStream* stream_1 =
send_channel_1->CreateOutgoingBidirectionalStream();
stream_1->SetDelegate(&fake_send_stream_delegate);
stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true);
messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
test::QuicTestMemSliceVector slice_2(
{std::make_pair(const_cast<char*>(messages_2.back().second.data()),
messages_2.back().second.size())});
QuartcStream* stream_2 =
send_channel_2->CreateOutgoingBidirectionalStream();
stream_2->SetDelegate(&fake_send_stream_delegate);
stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true);
}
EXPECT_TRUE(simulator_.RunUntilOrTimeout(
[this, &receive_delegate_1]() {
return receive_delegate_1.messages_received().size() == 10 &&
server_default_receiver_.messages_received().size() == 10;
},
QuicTime::Delta::FromSeconds(5)));
EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
}
// Tests that datagram-lost callbacks are invoked on the right send channel
// delegate, and that they work with overlapping datagram ids.
TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) {
Connect();
ASSERT_TRUE(simulator_.RunUntil([this]() {
return client_session_delegate_.handshake_count() > 0 &&
server_session_delegate_.handshake_count() > 0;
}));
// Just drop everything we try to send.
client_filter_.set_packets_to_drop(30);
FakeSendDelegate send_delegate_1;
QuartcSendChannel* send_channel_1 =
client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
FakeSendDelegate send_delegate_2;
QuartcSendChannel* send_channel_2 =
client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
FakeQuartcStreamDelegate fake_send_stream_delegate;
FakeReceiveDelegate receive_delegate_1;
server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
int num_messages = 10;
std::vector<std::pair<uint64_t, std::string>> messages_1;
messages_1.reserve(num_messages);
std::vector<std::pair<uint64_t, std::string>> messages_2;
messages_2.reserve(num_messages);
std::vector<int64_t> messages_sent_1;
std::vector<int64_t> messages_sent_2;
for (int i = 0; i < num_messages; ++i) {
messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
test::QuicTestMemSliceVector slice_1(
{std::make_pair(const_cast<char*>(messages_1.back().second.data()),
messages_1.back().second.size())});
send_channel_1->SendOrQueueMessage(slice_1.span(), i);
messages_sent_1.push_back(i);
messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
test::QuicTestMemSliceVector slice_2(
{std::make_pair(const_cast<char*>(messages_2.back().second.data()),
messages_2.back().second.size())});
// Use i + 5 as the datagram id for channel 2, so that some of the ids
// overlap and some are disjoint.
send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
messages_sent_2.push_back(i + 5);
}
// Now send something retransmittable to prompt loss detection.
// If we never send anything retransmittable, we will never get acks, and
// never detect losses.
messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages));
test::QuicTestMemSliceVector slice(
{std::make_pair(const_cast<char*>(messages_1.back().second.data()),
messages_1.back().second.size())});
QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream();
stream_1->SetDelegate(&fake_send_stream_delegate);
stream_1->WriteMemSlices(slice.span(), /*fin=*/true);
EXPECT_TRUE(simulator_.RunUntilOrTimeout(
[&send_delegate_1, &send_delegate_2]() {
return send_delegate_1.datagrams_lost().size() == 10 &&
send_delegate_2.datagrams_lost().size() == 10;
},
QuicTime::Delta::FromSeconds(60)));
EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1);
EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2);
EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty());
EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty());
EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty());
EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty());
}
TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) {
Connect();
FakeSendDelegate send_delegate;
QuartcSendChannel* send_channel =
client_multiplexer()->CreateSendChannel(1, &send_delegate);
FakeQuartcStreamDelegate fake_send_stream_delegate;
FakeReceiveDelegate receive_delegate;
server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate);
server_multiplexer()->RegisterReceiveChannel(1, nullptr);
int num_messages = 10;
std::vector<std::pair<uint64_t, std::string>> messages;
messages.reserve(num_messages);
std::vector<int64_t> messages_sent;
std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers;
for (int i = 0; i < num_messages; ++i) {
messages.emplace_back(1, QuicStrCat("message for 1: ", i));
test::QuicTestMemSliceVector slice(
{std::make_pair(const_cast<char*>(messages.back().second.data()),
messages.back().second.size())});
send_channel->SendOrQueueMessage(slice.span(), i);
messages_sent.push_back(i);
ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero())));
}
EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() {
return send_delegate.datagrams_acked().size() == 10;
}));
EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent);
EXPECT_EQ(server_default_receiver_.messages_received(), messages);
EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers));
}
TEST_F(QuartcMultiplexerTest, CloseEvent) {
Connect();
Disconnect();
EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
}
TEST_F(QuartcMultiplexerTest, CongestionEvent) {
Connect();
ASSERT_TRUE(simulator_.RunUntil([this]() {
return client_session_delegate_.handshake_count() > 0 &&
server_session_delegate_.handshake_count() > 0;
}));
EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(),
QuicBandwidth::Zero());
EXPECT_GT(client_session_delegate_.latest_pacing_rate(),
QuicBandwidth::Zero());
EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero());
}
} // namespace
} // namespace quic