Create a QuartcMultiplexer which separates streams and datagrams into channels.
QuartcMultiplexer effectively reorganizes the calls and callbacks for
QuartcSession into three categories:
- Per-channel send events
- Per-channel receive events
- Session-wide events that are not multiplexed
QuartcMultiplexer works at a low level to hide streams and messages from other
senders and receivers. It consists of a core multiplexer object which interacts
with the QuartcEndpoint/Session, a channel sender which handles outgoing data,
and a channel receiver which handles incoming data.
The sender has a specific channel id, specified on creation. A channel writes
this channel id in a varint form at the start of each new stream or datagram it
sends.
The multiplexer intercepts all the callbacks for incoming streams and datagrams.
It reads a varint from the start of each stream or datagram to identify the
channel id. It then looks up a receiver for that channel and delegates the
stream or datagram to that receiver.
A default receiver may be registered to handle all streams or datagrams not
assigned to a specific receiver. This allows endpoints to dispatch unhandled
data to a catch-all, or to await incoming data before registering a specific
receiver. The latter may be useful to in conjunction with some forms of
negotiation; eg. when certain ranges of channel ids are allocated to different
protocols, but it is unknown which protocol will be used at startup.
gfe-relnote: n/a (quartc only)
PiperOrigin-RevId: 260588723
Change-Id: I4a3b815b48c4f825c47bc60b0b3fd76d4e3614a5
diff --git a/quic/quartc/quartc_multiplexer_test.cc b/quic/quartc/quartc_multiplexer_test.cc
new file mode 100644
index 0000000..64609cb
--- /dev/null
+++ b/quic/quartc/quartc_multiplexer_test.cc
@@ -0,0 +1,490 @@
+// Copyright (c) 2019 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "net/third_party/quiche/src/quic/quartc/quartc_multiplexer.h"
+
+#include <memory>
+
+#include "net/third_party/quiche/src/quic/core/frames/quic_connection_close_frame.h"
+#include "net/third_party/quiche/src/quic/core/quic_bandwidth.h"
+#include "net/third_party/quiche/src/quic/core/quic_buffer_allocator.h"
+#include "net/third_party/quiche/src/quic/core/quic_time.h"
+#include "net/third_party/quiche/src/quic/core/quic_types.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_containers.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_logging.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_ptr_util.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_string_piece.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test.h"
+#include "net/third_party/quiche/src/quic/platform/api/quic_test_mem_slice_vector.h"
+#include "net/third_party/quiche/src/quic/quartc/counting_packet_filter.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_endpoint.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_fakes.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_session.h"
+#include "net/third_party/quiche/src/quic/quartc/quartc_stream.h"
+#include "net/third_party/quiche/src/quic/quartc/simulated_packet_transport.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/link.h"
+#include "net/third_party/quiche/src/quic/test_tools/simulator/simulator.h"
+
+namespace quic {
+namespace {
+
+using ::testing::ElementsAreArray;
+using ::testing::Gt;
+using ::testing::IsEmpty;
+using ::testing::Pair;
+
+constexpr QuicTime::Delta kPropagationDelay =
+ QuicTime::Delta::FromMilliseconds(10);
+
+class FakeSessionEventDelegate : public QuartcSessionEventDelegate {
+ public:
+ void OnSessionCreated(QuartcSession* session) override {
+ session->StartCryptoHandshake();
+ session_ = session;
+ }
+
+ void OnConnectionWritable() override { ++writable_count_; }
+
+ void OnCryptoHandshakeComplete() override { ++handshake_count_; }
+
+ void OnConnectionClosed(const QuicConnectionCloseFrame& frame,
+ ConnectionCloseSource source) override {
+ error_ = frame.quic_error_code;
+ close_source_ = source;
+ }
+
+ void OnCongestionControlChange(QuicBandwidth bandwidth_estimate,
+ QuicBandwidth pacing_rate,
+ QuicTime::Delta latest_rtt) override {
+ latest_bandwidth_estimate_ = bandwidth_estimate;
+ latest_pacing_rate_ = pacing_rate;
+ latest_rtt_ = latest_rtt;
+ }
+
+ QuartcSession* session() { return session_; }
+ int writable_count() const { return writable_count_; }
+ int handshake_count() const { return handshake_count_; }
+ QuicErrorCode error() const { return error_; }
+ ConnectionCloseSource close_source() const { return close_source_; }
+ QuicBandwidth latest_bandwidth_estimate() const {
+ return latest_bandwidth_estimate_;
+ }
+ QuicBandwidth latest_pacing_rate() const { return latest_pacing_rate_; }
+ QuicTime::Delta latest_rtt() const { return latest_rtt_; }
+
+ private:
+ QuartcSession* session_ = nullptr;
+ int writable_count_ = 0;
+ int handshake_count_ = 0;
+ QuicErrorCode error_ = QUIC_NO_ERROR;
+ ConnectionCloseSource close_source_;
+ QuicBandwidth latest_bandwidth_estimate_ = QuicBandwidth::Zero();
+ QuicBandwidth latest_pacing_rate_ = QuicBandwidth::Zero();
+ QuicTime::Delta latest_rtt_ = QuicTime::Delta::Zero();
+};
+
+class FakeSendDelegate : public QuartcSendChannel::Delegate {
+ public:
+ void OnMessageSent(int64_t datagram_id) override {
+ datagrams_sent_.push_back(datagram_id);
+ }
+
+ void OnMessageAcked(int64_t datagram_id,
+ QuicTime receive_timestamp) override {
+ datagrams_acked_.push_back({datagram_id, receive_timestamp});
+ }
+
+ void OnMessageLost(int64_t datagram_id) override {
+ datagrams_lost_.push_back(datagram_id);
+ }
+
+ const std::vector<int64_t>& datagrams_sent() const { return datagrams_sent_; }
+ const std::vector<std::pair<int64_t, QuicTime>>& datagrams_acked() const {
+ return datagrams_acked_;
+ }
+ const std::vector<int64_t>& datagrams_lost() const { return datagrams_lost_; }
+
+ private:
+ std::vector<int64_t> datagrams_sent_;
+ std::vector<std::pair<int64_t, QuicTime>> datagrams_acked_;
+ std::vector<int64_t> datagrams_lost_;
+};
+
+class FakeReceiveDelegate : public QuartcReceiveChannel,
+ public QuartcStream::Delegate {
+ public:
+ const std::vector<std::pair<uint64_t, std::string>> messages_received()
+ const {
+ return messages_received_;
+ }
+
+ void OnIncomingStream(uint64_t channel_id, QuartcStream* stream) override {
+ stream->SetDelegate(this);
+ stream_to_channel_id_[stream] = channel_id;
+ }
+
+ void OnMessageReceived(uint64_t channel_id,
+ QuicStringPiece message) override {
+ messages_received_.emplace_back(channel_id, message);
+ }
+
+ // Stream delegate overrides.
+ size_t OnReceived(QuartcStream* stream,
+ iovec* iov,
+ size_t iov_length,
+ bool fin) override {
+ if (!fin) {
+ return 0;
+ }
+
+ size_t bytes = 0;
+ std::string message;
+ for (size_t i = 0; i < iov_length; ++i) {
+ message +=
+ std::string(static_cast<char*>(iov[i].iov_base), iov[i].iov_len);
+ bytes += iov[i].iov_len;
+ }
+ QUIC_LOG(INFO) << "Received " << bytes << " byte message on channel "
+ << stream_to_channel_id_[stream];
+ messages_received_.emplace_back(stream_to_channel_id_[stream], message);
+ return bytes;
+ }
+
+ void OnClose(QuartcStream* stream) override {
+ stream_to_channel_id_.erase(stream);
+ }
+
+ void OnBufferChanged(QuartcStream* /*stream*/) override {}
+
+ private:
+ std::vector<std::pair<uint64_t, std::string>> messages_received_;
+ QuicUnorderedMap<QuartcStream*, uint64_t> stream_to_channel_id_;
+};
+
+class QuartcMultiplexerTest : public QuicTest {
+ public:
+ QuartcMultiplexerTest()
+ : simulator_(),
+ client_transport_(&simulator_,
+ "client_transport",
+ "server_transport",
+ 10 * kDefaultMaxPacketSize),
+ server_transport_(&simulator_,
+ "server_transport",
+ "client_transport",
+ 10 * kDefaultMaxPacketSize),
+ client_filter_(&simulator_, "client_filter", &client_transport_),
+ client_server_link_(&client_filter_,
+ &server_transport_,
+ QuicBandwidth::FromKBitsPerSecond(10 * 1000),
+ kPropagationDelay),
+ client_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
+ &client_session_delegate_,
+ &client_default_receiver_),
+ server_multiplexer_(simulator_.GetStreamSendBufferAllocator(),
+ &server_session_delegate_,
+ &server_default_receiver_),
+ client_endpoint_(QuicMakeUnique<QuartcClientEndpoint>(
+ simulator_.GetAlarmFactory(),
+ simulator_.GetClock(),
+ simulator_.GetRandomGenerator(),
+ &client_multiplexer_,
+ quic::QuartcSessionConfig(),
+ /*serialized_server_config=*/"")),
+ server_endpoint_(QuicMakeUnique<QuartcServerEndpoint>(
+ simulator_.GetAlarmFactory(),
+ simulator_.GetClock(),
+ simulator_.GetRandomGenerator(),
+ &server_multiplexer_,
+ quic::QuartcSessionConfig())) {
+ // TODO(b/134175506): Remove when IETF QUIC supports receive timestamps.
+ SetQuicReloadableFlag(quic_enable_version_99, false);
+ }
+
+ void Connect() {
+ client_endpoint_->Connect(&client_transport_);
+ server_endpoint_->Connect(&server_transport_);
+ ASSERT_TRUE(simulator_.RunUntil([this]() {
+ return client_session_delegate_.writable_count() > 0 &&
+ server_session_delegate_.writable_count() > 0;
+ }));
+ }
+
+ void Disconnect() {
+ client_session_delegate_.session()->CloseConnection("test");
+ server_session_delegate_.session()->CloseConnection("test");
+ }
+
+ protected:
+ QuartcMultiplexer* client_multiplexer() { return &client_multiplexer_; }
+
+ QuartcMultiplexer* server_multiplexer() { return &server_multiplexer_; }
+
+ simulator::Simulator simulator_;
+
+ simulator::SimulatedQuartcPacketTransport client_transport_;
+ simulator::SimulatedQuartcPacketTransport server_transport_;
+ simulator::CountingPacketFilter client_filter_;
+ simulator::SymmetricLink client_server_link_;
+
+ FakeSessionEventDelegate client_session_delegate_;
+ FakeSessionEventDelegate server_session_delegate_;
+
+ FakeReceiveDelegate client_default_receiver_;
+ FakeReceiveDelegate server_default_receiver_;
+
+ QuartcMultiplexer client_multiplexer_;
+ QuartcMultiplexer server_multiplexer_;
+
+ std::unique_ptr<QuartcClientEndpoint> client_endpoint_;
+ std::unique_ptr<QuartcServerEndpoint> server_endpoint_;
+};
+
+TEST_F(QuartcMultiplexerTest, MultiplexMessages) {
+ Connect();
+
+ FakeSendDelegate send_delegate_1;
+ QuartcSendChannel* send_channel_1 =
+ client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+ FakeSendDelegate send_delegate_2;
+ QuartcSendChannel* send_channel_2 =
+ client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+ FakeReceiveDelegate receive_delegate_1;
+ server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+ int num_messages = 10;
+ std::vector<std::pair<uint64_t, std::string>> messages_1;
+ messages_1.reserve(num_messages);
+ std::vector<std::pair<uint64_t, std::string>> messages_2;
+ messages_2.reserve(num_messages);
+ std::vector<int64_t> messages_sent_1;
+ std::vector<int64_t> messages_sent_2;
+ std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_1;
+ std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers_2;
+ for (int i = 0; i < num_messages; ++i) {
+ messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+ test::QuicTestMemSliceVector slice_1(
+ {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+ messages_1.back().second.size())});
+ send_channel_1->SendOrQueueMessage(slice_1.span(), i);
+ messages_sent_1.push_back(i);
+ ack_matchers_1.push_back(Pair(i, Gt(QuicTime::Zero())));
+
+ messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+ test::QuicTestMemSliceVector slice_2(
+ {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+ messages_2.back().second.size())});
+ // Use i + 5 as the datagram id for channel 2, so that some of the ids
+ // overlap and some are disjoint.
+ send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
+ messages_sent_2.push_back(i + 5);
+ ack_matchers_2.push_back(Pair(i + 5, Gt(QuicTime::Zero())));
+ }
+
+ EXPECT_TRUE(simulator_.RunUntil([&send_delegate_1, &send_delegate_2]() {
+ return send_delegate_1.datagrams_acked().size() == 10 &&
+ send_delegate_2.datagrams_acked().size() == 10;
+ }));
+
+ EXPECT_EQ(send_delegate_1.datagrams_sent(), messages_sent_1);
+ EXPECT_EQ(send_delegate_2.datagrams_sent(), messages_sent_2);
+
+ EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
+ EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
+
+ EXPECT_THAT(send_delegate_1.datagrams_acked(),
+ ElementsAreArray(ack_matchers_1));
+ EXPECT_THAT(send_delegate_2.datagrams_acked(),
+ ElementsAreArray(ack_matchers_2));
+}
+
+TEST_F(QuartcMultiplexerTest, MultiplexStreams) {
+ FakeSendDelegate send_delegate_1;
+ QuartcSendChannel* send_channel_1 =
+ client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+ FakeSendDelegate send_delegate_2;
+ QuartcSendChannel* send_channel_2 =
+ client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+ FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+ FakeReceiveDelegate receive_delegate_1;
+ server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+ Connect();
+
+ int num_messages = 10;
+ std::vector<std::pair<uint64_t, std::string>> messages_1;
+ messages_1.reserve(num_messages);
+ std::vector<std::pair<uint64_t, std::string>> messages_2;
+ messages_2.reserve(num_messages);
+ for (int i = 0; i < num_messages; ++i) {
+ messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+ test::QuicTestMemSliceVector slice_1(
+ {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+ messages_1.back().second.size())});
+ QuartcStream* stream_1 =
+ send_channel_1->CreateOutgoingBidirectionalStream();
+ stream_1->SetDelegate(&fake_send_stream_delegate);
+ stream_1->WriteMemSlices(slice_1.span(), /*fin=*/true);
+
+ messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+ test::QuicTestMemSliceVector slice_2(
+ {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+ messages_2.back().second.size())});
+ QuartcStream* stream_2 =
+ send_channel_2->CreateOutgoingBidirectionalStream();
+ stream_2->SetDelegate(&fake_send_stream_delegate);
+ stream_2->WriteMemSlices(slice_2.span(), /*fin=*/true);
+ }
+
+ EXPECT_TRUE(simulator_.RunUntilOrTimeout(
+ [this, &receive_delegate_1]() {
+ return receive_delegate_1.messages_received().size() == 10 &&
+ server_default_receiver_.messages_received().size() == 10;
+ },
+ QuicTime::Delta::FromSeconds(5)));
+
+ EXPECT_EQ(receive_delegate_1.messages_received(), messages_1);
+ EXPECT_EQ(server_default_receiver_.messages_received(), messages_2);
+}
+
+// Tests that datagram-lost callbacks are invoked on the right send channel
+// delegate, and that they work with overlapping datagram ids.
+TEST_F(QuartcMultiplexerTest, MultiplexLostDatagrams) {
+ Connect();
+ ASSERT_TRUE(simulator_.RunUntil([this]() {
+ return client_session_delegate_.handshake_count() > 0 &&
+ server_session_delegate_.handshake_count() > 0;
+ }));
+
+ // Just drop everything we try to send.
+ client_filter_.set_packets_to_drop(30);
+
+ FakeSendDelegate send_delegate_1;
+ QuartcSendChannel* send_channel_1 =
+ client_multiplexer()->CreateSendChannel(1, &send_delegate_1);
+ FakeSendDelegate send_delegate_2;
+ QuartcSendChannel* send_channel_2 =
+ client_multiplexer()->CreateSendChannel(2, &send_delegate_2);
+
+ FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+ FakeReceiveDelegate receive_delegate_1;
+ server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate_1);
+
+ int num_messages = 10;
+ std::vector<std::pair<uint64_t, std::string>> messages_1;
+ messages_1.reserve(num_messages);
+ std::vector<std::pair<uint64_t, std::string>> messages_2;
+ messages_2.reserve(num_messages);
+ std::vector<int64_t> messages_sent_1;
+ std::vector<int64_t> messages_sent_2;
+ for (int i = 0; i < num_messages; ++i) {
+ messages_1.emplace_back(1, QuicStrCat("message for 1: ", i));
+ test::QuicTestMemSliceVector slice_1(
+ {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+ messages_1.back().second.size())});
+ send_channel_1->SendOrQueueMessage(slice_1.span(), i);
+ messages_sent_1.push_back(i);
+
+ messages_2.emplace_back(2, QuicStrCat("message for 2: ", i));
+ test::QuicTestMemSliceVector slice_2(
+ {std::make_pair(const_cast<char*>(messages_2.back().second.data()),
+ messages_2.back().second.size())});
+ // Use i + 5 as the datagram id for channel 2, so that some of the ids
+ // overlap and some are disjoint.
+ send_channel_2->SendOrQueueMessage(slice_2.span(), i + 5);
+ messages_sent_2.push_back(i + 5);
+ }
+
+ // Now send something retransmittable to prompt loss detection.
+ // If we never send anything retransmittable, we will never get acks, and
+ // never detect losses.
+ messages_1.emplace_back(1, QuicStrCat("message for 1: ", num_messages));
+ test::QuicTestMemSliceVector slice(
+ {std::make_pair(const_cast<char*>(messages_1.back().second.data()),
+ messages_1.back().second.size())});
+ QuartcStream* stream_1 = send_channel_1->CreateOutgoingBidirectionalStream();
+ stream_1->SetDelegate(&fake_send_stream_delegate);
+ stream_1->WriteMemSlices(slice.span(), /*fin=*/true);
+
+ EXPECT_TRUE(simulator_.RunUntilOrTimeout(
+ [&send_delegate_1, &send_delegate_2]() {
+ return send_delegate_1.datagrams_lost().size() == 10 &&
+ send_delegate_2.datagrams_lost().size() == 10;
+ },
+ QuicTime::Delta::FromSeconds(60)));
+
+ EXPECT_EQ(send_delegate_1.datagrams_lost(), messages_sent_1);
+ EXPECT_EQ(send_delegate_2.datagrams_lost(), messages_sent_2);
+
+ EXPECT_THAT(send_delegate_1.datagrams_acked(), IsEmpty());
+ EXPECT_THAT(send_delegate_2.datagrams_acked(), IsEmpty());
+
+ EXPECT_THAT(receive_delegate_1.messages_received(), IsEmpty());
+ EXPECT_THAT(server_default_receiver_.messages_received(), IsEmpty());
+}
+
+TEST_F(QuartcMultiplexerTest, UnregisterReceiveChannel) {
+ Connect();
+
+ FakeSendDelegate send_delegate;
+ QuartcSendChannel* send_channel =
+ client_multiplexer()->CreateSendChannel(1, &send_delegate);
+ FakeQuartcStreamDelegate fake_send_stream_delegate;
+
+ FakeReceiveDelegate receive_delegate;
+ server_multiplexer()->RegisterReceiveChannel(1, &receive_delegate);
+ server_multiplexer()->RegisterReceiveChannel(1, nullptr);
+
+ int num_messages = 10;
+ std::vector<std::pair<uint64_t, std::string>> messages;
+ messages.reserve(num_messages);
+ std::vector<int64_t> messages_sent;
+ std::vector<testing::Matcher<std::pair<int64_t, QuicTime>>> ack_matchers;
+ for (int i = 0; i < num_messages; ++i) {
+ messages.emplace_back(1, QuicStrCat("message for 1: ", i));
+ test::QuicTestMemSliceVector slice(
+ {std::make_pair(const_cast<char*>(messages.back().second.data()),
+ messages.back().second.size())});
+ send_channel->SendOrQueueMessage(slice.span(), i);
+ messages_sent.push_back(i);
+ ack_matchers.push_back(Pair(i, Gt(QuicTime::Zero())));
+ }
+
+ EXPECT_TRUE(simulator_.RunUntil([&send_delegate]() {
+ return send_delegate.datagrams_acked().size() == 10;
+ }));
+
+ EXPECT_EQ(send_delegate.datagrams_sent(), messages_sent);
+ EXPECT_EQ(server_default_receiver_.messages_received(), messages);
+ EXPECT_THAT(send_delegate.datagrams_acked(), ElementsAreArray(ack_matchers));
+}
+
+TEST_F(QuartcMultiplexerTest, CloseEvent) {
+ Connect();
+ Disconnect();
+
+ EXPECT_EQ(client_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
+ EXPECT_EQ(server_session_delegate_.error(), QUIC_CONNECTION_CANCELLED);
+}
+
+TEST_F(QuartcMultiplexerTest, CongestionEvent) {
+ Connect();
+ ASSERT_TRUE(simulator_.RunUntil([this]() {
+ return client_session_delegate_.handshake_count() > 0 &&
+ server_session_delegate_.handshake_count() > 0;
+ }));
+
+ EXPECT_GT(client_session_delegate_.latest_bandwidth_estimate(),
+ QuicBandwidth::Zero());
+ EXPECT_GT(client_session_delegate_.latest_pacing_rate(),
+ QuicBandwidth::Zero());
+ EXPECT_GT(client_session_delegate_.latest_rtt(), QuicTime::Delta::Zero());
+}
+
+} // namespace
+} // namespace quic